public class JdbcTokenStore extends Object implements TokenStore
TokenSchema.tokenTable() in which to store the tokens.| Modifier and Type | Class and Description |
|---|---|
static class |
JdbcTokenStore.Builder
Builder class to instantiate a
JdbcTokenStore. |
| Modifier | Constructor and Description |
|---|---|
protected |
JdbcTokenStore(JdbcTokenStore.Builder builder)
Instantiate a
JdbcTokenStore based on the fields contained in the JdbcTokenStore.Builder. |
| Modifier and Type | Method and Description |
|---|---|
static JdbcTokenStore.Builder |
builder()
Instantiate a Builder to be able to create a
JdbcTokenStore. |
protected TrackingToken |
claimToken(Connection connection,
AbstractTokenEntry<?> entry)
Tries to claim the given token
entry. |
void |
createSchema(TokenTableFactory schemaFactory)
Performs the DDL queries to create the schema necessary for this token store implementation.
|
protected PreparedStatement |
deleteToken(Connection connection,
String processorName,
int segment)
Creates a new
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment. |
void |
deleteToken(String processorName,
int segment)
Deletes the token for the processor with given
processorName and segment. |
int[] |
fetchSegments(String processorName)
Returns an array of known
segments for a given processorName. |
TrackingToken |
fetchToken(String processorName,
int segment)
|
protected Connection |
getConnection()
Returns a
Connection to the database. |
void |
initializeSegment(TrackingToken token,
String processorName,
int segment)
Initializes a segment with given
segment for the processor with given processorName to contain
the given token. |
void |
initializeTokenSegments(String processorName,
int segmentCount)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
void |
initializeTokenSegments(String processorName,
int segmentCount,
TrackingToken initialToken)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
protected TrackingToken |
insertTokenEntry(Connection connection,
TrackingToken token,
String processorName,
int segment)
Inserts a new token entry via the given updatable
resultSet. |
protected TrackingToken |
loadToken(Connection connection,
ResultSet resultSet,
String processorName,
int segment)
Tries loading an existing token owned by a processor with given
processorName and segment. |
protected <T> T |
readSerializedData(ResultSet resultSet,
String columnName)
Returns the serialized token data from the given
resultSet at given columnName. |
protected AbstractTokenEntry<?> |
readTokenEntry(ResultSet resultSet)
Convert given
resultSet to an AbstractTokenEntry. |
protected PreparedStatement |
releaseClaim(Connection connection,
String processorName,
int segment)
Creates a new
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment. |
void |
releaseClaim(String processorName,
int segment)
Release a claim of the token for given
processorName and segment. |
boolean |
requiresExplicitSegmentInitialization()
Indicates whether this TokenStore instance requires segments to be explicitly initialized, before any tokens
can be claimed for that segment.
|
Optional<String> |
retrieveStorageIdentifier()
Returns a unique identifier that uniquely identifies the storage location of the tokens in this store.
|
protected PreparedStatement |
select(Connection connection,
String processorName,
int segment,
boolean forUpdate)
Returns a
PreparedStatement to select a token entry from the underlying storage, either for updating
or just for reading. |
protected PreparedStatement |
selectForSegments(Connection connection,
String processorName)
Returns a
PreparedStatement to select all segments ids for a given processorName from the underlying
storage. |
protected PreparedStatement |
selectForUpdate(Connection connection,
String processorName,
int segment)
Returns a
PreparedStatement to select a token entry from the underlying storage. |
Serializer |
serializer()
Returns the serializer used by the Token Store to serialize tokens.
|
void |
storeToken(TrackingToken token,
String processorName,
int segment)
Stores the given
token in the store. |
protected PreparedStatement |
storeUpdate(Connection connection,
TrackingToken token,
String processorName,
int segment)
Returns a
PreparedStatement which updates the given token for the given processorName and
segment combination. |
protected void |
updateToken(Connection connection,
ResultSet resultSet,
TrackingToken token,
String processorName,
int segment)
If the given
resultSet has an entry, attempts to replace the token in the entry with the given
token and claim ownership. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitextendClaimprotected JdbcTokenStore(JdbcTokenStore.Builder builder)
JdbcTokenStore based on the fields contained in the JdbcTokenStore.Builder.
Will assert that the ConnectionProvider, Serializer, TokenSchema, claimTimeout,
nodeId and contentType are not null, and will throw an AxonConfigurationException
if any of them is null.
builder - the JdbcTokenStore.Builder used to instantiate a JdbcTokenStore instancepublic static JdbcTokenStore.Builder builder()
JdbcTokenStore.
The schema is defaulted to an TokenSchema, the claimTimeout to a 10 seconds duration,
nodeId is defaulted to the name of the managed bean for the runtime system of the Java virtual machine
and the contentType to a byte[] Class. The ConnectionProvider and
Serializer are hard requirements and as such should be provided.
JdbcTokenStorepublic void createSchema(TokenTableFactory schemaFactory)
schemaFactory - factory of the token entry schemapublic void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException
TokenStoresegmentCount number of segments for the given processorName to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will initialize the tokens, but not claim them. It will create the segments ranging from 0
until segmentCount - 1.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments in interface TokenStoreprocessorName - The name of the processor to initialize segments forsegmentCount - The number of segments to initializeUnableToClaimTokenException - when a segment has already been createdpublic void initializeTokenSegments(String processorName, int segmentCount, TrackingToken initialToken) throws UnableToClaimTokenException
TokenStoresegmentCount number of segments for the given processorName to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store initialToken for all segments as starting point for processor, but not claim them.
It will create the segments ranging from 0 until segmentCount - 1.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments in interface TokenStoreprocessorName - The name of the processor to initialize segments forsegmentCount - The number of segments to initializeinitialToken - The initial token which is used as a starting point for processorUnableToClaimTokenException - when a segment has already been createdpublic void initializeSegment(TrackingToken token, String processorName, int segment) throws UnableToInitializeTokenException
TokenStoresegment for the processor with given processorName to contain
the given token.
This method fails if a Token already exists for the given processor and segment, even if that token has been claimed by the active instance.
This method will not claim the initialized segment. Use TokenStore.fetchToken(String, int) to retrieve and claim
the token.
initializeSegment in interface TokenStoretoken - The token to initialize the segment withprocessorName - The name of the processor to create the segment forsegment - The identifier of the segment to initializeUnableToInitializeTokenException - if a Token already existspublic boolean requiresExplicitSegmentInitialization()
TokenStorerequiresExplicitSegmentInitialization in interface TokenStoretrue if this instance requires tokens to be explicitly initialized, otherwise false.TokenStore.initializeTokenSegments(String, int),
TokenStore.initializeTokenSegments(String, int, TrackingToken),
TokenStore.initializeSegment(TrackingToken, String, int)public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdentifierException
TokenStore
Note that this method may require the implementation to consult its underlying storage. Therefore, a Transaction
should be active when this method is called, similarly to invocations like TokenStore.fetchToken(String, int),
TokenStore.fetchSegments(String), etc. When no Transaction is active, the behavior is undefined.
retrieveStorageIdentifier in interface TokenStoreUnableToRetrieveIdentifierException - when the implementation was unable to determine its identifierpublic Serializer serializer()
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException
TokenStoretoken in the store. The token marks the current position of the process with given
processorName and segment. The given token may be null.
Any claims made by the current process have their timestamp updated.
This method should throw an UnableToClaimTokenException when the given segment has not been
initialized with a Token (albeit null) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the TokenStore.requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
storeToken in interface TokenStoretoken - The token to store for a given process and segment. May be null.processorName - The name of the process for which to store the tokensegment - The index of the segment for which to store the tokenUnableToClaimTokenException - when the token being updated has been claimed by another process.public TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException
TokenStoretoken for the given processorName and segment.
Returns null if the stored token for the given process and segment is
null.
This method should throw an UnableToClaimTokenException when the given segment has not been
initialized with a Token (albeit null) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the TokenStore.requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release
the claim, use TokenStore.releaseClaim(String, int)
fetchToken in interface TokenStoreprocessorName - The process name for which to fetch the tokensegment - The segment index for which to fetch the tokennull if the store holds no token for given process and segmentUnableToClaimTokenException - if there is a token for given processorName and segment, but
they are claimed by another process.public void releaseClaim(String processorName, int segment)
TokenStoreprocessorName and segment. If no such claim existed,
nothing happens.
The caller must ensure not to use any streams opened based on the token for which the claim is released.
releaseClaim in interface TokenStoreprocessorName - The name of the process owning the token (e.g. a TrackingEventProcessor name)segment - the segment for which a token was obtainedpublic void deleteToken(String processorName, int segment)
TokenStoreprocessorName and segment. The token must
be owned by the current node, to be able to delete it.
Implementations should implement this method only when TokenStore.requiresExplicitSegmentInitialization() is overridden to
return true. Deleting tokens using implementations that do not require explicit token initialization is
unsafe, as a claim will automatically recreate the deleted token instance, which may result in concurrency
issues.
deleteToken in interface TokenStoreprocessorName - The name of the processor to remove the token forsegment - The segment to deletepublic int[] fetchSegments(String processorName)
TokenStoresegments for a given processorName.
The segments returned are segments for which a token has been stored previously. When the TokenStore is
empty, an empty array is returned.
fetchSegments in interface TokenStoreprocessorName - The process name for which to fetch the segmentsprotected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException
PreparedStatement to select all segments ids for a given processorName from the underlying
storage.connection - the connection to the underlying databaseprocessorName - the name of the processor to fetch the segments forPreparedStatement that will fetch segments when executedSQLException - when an exception occurs while creating the prepared statementprotected PreparedStatement storeUpdate(Connection connection, TrackingToken token, String processorName, int segment) throws SQLException
PreparedStatement which updates the given token for the given processorName and
segment combination.connection - the connection to the underlying databasetoken - the new token to storeprocessorName - the name of the processor executing the updatesegment - the segment of the processor to executing the updatePreparedStatement that will update a token entry when executedSQLException - when an exception occurs while creating the prepared statementprotected PreparedStatement selectForUpdate(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement to select a token entry from the underlying storage. The ResultSet
that is returned when this statement is executed should be updatable.connection - the connection to the underlying databaseprocessorName - the name of the processor to fetch the entry forsegment - the segment of the processor to fetch the entry forPreparedStatement that will fetch an updatable token entry when executedSQLException - when an exception occurs while creating the prepared statementprotected PreparedStatement select(Connection connection, String processorName, int segment, boolean forUpdate) throws SQLException
PreparedStatement to select a token entry from the underlying storage, either for updating
or just for reading.connection - the connection to the underlying databaseprocessorName - the name of the processor to fetch the entry forsegment - the segment of the processor to fetch the entry forforUpdate - whether the returned token should be updatablePreparedStatement that will fetch an updatable token entry when executedSQLException - when an exception occurs while creating the prepared statementprotected void updateToken(Connection connection, ResultSet resultSet, TrackingToken token, String processorName, int segment) throws SQLException
resultSet has an entry, attempts to replace the token in the entry with the given
token and claim ownership.connection - the connection to the underlying databaseresultSet - the updatable query result set of an executed PreparedStatementtoken - the token for the new or updated entryprocessorName - the name of the processor owning the tokensegment - the segment of the processor owning the tokenUnableToClaimTokenException - if the token cannot be claimed because another node currently owns the tokenSQLException - when an exception occurs while updating the result setprotected TrackingToken claimToken(Connection connection, AbstractTokenEntry<?> entry) throws SQLException
entry. If the claim fails an UnableToClaimTokenException should be
thrown. Otherwise the given resultSet should be updated to reflect the claim.connection - the connection to the underlying databaseentry - the entry extracted from the given result setUnableToClaimTokenException - if the token cannot be claimed because another node currently owns the tokenSQLException - when an exception occurs while claiming the token entryprotected TrackingToken loadToken(Connection connection, ResultSet resultSet, String processorName, int segment) throws SQLException
processorName and segment. If
such a token entry exists an attempt will be made to claim the token. If that succeeds the token will be
returned. If the token is already owned by another node an UnableToClaimTokenException will be thrown.
If no such token exists yet, a new token entry will be inserted with null token owned by this node and
return null.
connection - the connection to the underlying databaseresultSet - the updatable result set from a prior select for update queryprocessorName - the name of the processor to load or insert a token entry forsegment - the segment of the processor to load or insert a token entry fornull if a new entry was insertedUnableToClaimTokenException - if the token cannot be claimed because another node currently owns the tokenSQLException - when an exception occurs while loading or inserting the entryprotected TrackingToken insertTokenEntry(Connection connection, TrackingToken token, String processorName, int segment) throws SQLException
resultSet.connection - the connection to the underlying databasetoken - the token of the entry to insertprocessorName - the name of the processor to insert a token forsegment - the segment of the processor to insert a token forSQLException - when an exception occurs while inserting a token entryprotected AbstractTokenEntry<?> readTokenEntry(ResultSet resultSet) throws SQLException
resultSet to an AbstractTokenEntry. The result set contains a single token entry.resultSet - the result set of a prior select statement containing a single token entrySQLException - if the result set cannot be converted to an entryprotected PreparedStatement releaseClaim(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment.connection - the connection that should be used to create a PreparedStatementprocessorName - the name of the processor for which to release this node's claimsegment - the segment of the processor for which to release this node's claimPreparedStatement that will release the claim this node has on the token entrySQLException - if the statement to release a claim cannot be createdprotected PreparedStatement deleteToken(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment.connection - the connection that should be used to create a PreparedStatementprocessorName - the name of the processor for which to release this node's claimsegment - the segment of the processor for which to release this node's claimPreparedStatement that will release the claim this node has on the token entrySQLException - if the statement to release a claim cannot be createdprotected <T> T readSerializedData(ResultSet resultSet, String columnName) throws SQLException
resultSet at given columnName.T - the type of data to returnresultSet - the result set to get serialized data fromcolumnName - the name of the column containing the serialized tokenSQLException - if the token cannot be read from the entryprotected Connection getConnection()
Connection to the database.Copyright © 2010–2020. All rights reserved.