public class JdbcTokenStore extends Object implements TokenStore
TokenStore
implementation that uses JDBC to save and load TrackingToken
instances.
Before using this store make sure the database contains a table named TokenSchema.tokenTable()
in which to
store the tokens. For convenience, this table can be constructed through the createSchema(TokenTableFactory)
operation.
Modifier and Type | Class and Description |
---|---|
static class |
JdbcTokenStore.Builder
Builder class to instantiate a
JdbcTokenStore . |
Modifier | Constructor and Description |
---|---|
protected |
JdbcTokenStore(JdbcTokenStore.Builder builder)
Instantiate a
JdbcTokenStore based on the fields contained in the JdbcTokenStore.Builder . |
Modifier and Type | Method and Description |
---|---|
static JdbcTokenStore.Builder |
builder()
Instantiate a Builder to be able to create a
JdbcTokenStore . |
protected TrackingToken |
claimToken(Connection connection,
AbstractTokenEntry<?> entry)
Tries to claim the given token
entry . |
void |
createSchema(TokenTableFactory schemaFactory)
Performs the DDL queries to create the schema necessary for this token store implementation.
|
protected PreparedStatement |
deleteToken(Connection connection,
String processorName,
int segment)
Creates a new
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment . |
void |
deleteToken(String processorName,
int segment)
Deletes the token for the processor with given
processorName and segment . |
List<Segment> |
fetchAvailableSegments(String processorName)
Returns a List of known available
segments for a given processorName . |
int[] |
fetchSegments(String processorName)
Returns an array of known
segments for a given processorName . |
TrackingToken |
fetchToken(String processorName,
int segment)
|
TrackingToken |
fetchToken(String processorName,
Segment segment)
|
protected Connection |
getConnection()
Returns a
Connection to the database. |
void |
initializeSegment(TrackingToken token,
String processorName,
int segment)
Initializes a segment with given
segment for the processor with given processorName to contain
the given token . |
void |
initializeTokenSegments(String processorName,
int segmentCount)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
void |
initializeTokenSegments(String processorName,
int segmentCount,
TrackingToken initialToken)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
protected TrackingToken |
insertTokenEntry(Connection connection,
TrackingToken token,
String processorName,
int segment)
Inserts a new token entry via the given updatable
resultSet . |
protected TrackingToken |
loadToken(Connection connection,
ResultSet resultSet,
String processorName,
int segment)
Tries loading an existing token owned by a processor with given
processorName and segment . |
protected TrackingToken |
loadToken(Connection connection,
ResultSet resultSet,
String processorName,
Segment segment)
Tries loading an existing token owned by a processor with given
processorName and segment . |
protected <T> T |
readSerializedData(ResultSet resultSet,
String columnName)
Returns the serialized token data from the given
resultSet at given columnName . |
protected AbstractTokenEntry<?> |
readTokenEntry(ResultSet resultSet)
Convert given
resultSet to an AbstractTokenEntry . |
protected PreparedStatement |
releaseClaim(Connection connection,
String processorName,
int segment)
Creates a new
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment . |
void |
releaseClaim(String processorName,
int segment)
Release a claim of the token for given
processorName and segment . |
boolean |
requiresExplicitSegmentInitialization()
Indicates whether this TokenStore instance requires segments to be explicitly initialized, before any tokens
can be claimed for that segment.
|
Optional<String> |
retrieveStorageIdentifier()
Returns a unique identifier that uniquely identifies the storage location of the tokens in this store.
|
protected PreparedStatement |
select(Connection connection,
String processorName,
int segment,
boolean forUpdate)
Returns a
PreparedStatement to select a token entry from the underlying storage, either for updating
or just for reading. |
protected PreparedStatement |
selectForSegments(Connection connection,
String processorName)
Returns a
PreparedStatement to select all segments ids for a given processorName from the underlying
storage. |
protected PreparedStatement |
selectForUpdate(Connection connection,
String processorName,
int segment)
Returns a
PreparedStatement to select a token entry from the underlying storage. |
protected PreparedStatement |
selectSegments(Connection connection,
String processorName,
int splitSegmentId,
int mergeableSegmentId)
Returns a
PreparedStatement for the count of segments that can be found after searching for the splitSegmentId and mergableSegmetnId . |
protected PreparedStatement |
selectTokenEntries(Connection connection,
String processorName)
Returns a
PreparedStatement to select all TokenEntries for a given processorName from the underlying storage. |
Serializer |
serializer()
Returns the serializer used by the Token Store to serialize tokens.
|
void |
storeToken(TrackingToken token,
String processorName,
int segment)
Stores the given
token in the store. |
protected PreparedStatement |
storeUpdate(Connection connection,
TrackingToken token,
String processorName,
int segment)
Returns a
PreparedStatement which updates the given token for the given processorName and
segment combination. |
protected void |
updateToken(Connection connection,
ResultSet resultSet,
TrackingToken token,
String processorName,
int segment)
If the given
resultSet has an entry, attempts to replace the token in the entry with the given
token and claim ownership. |
protected void |
validateSegment(String processorName,
Segment segment)
Validate a
segment by checking for the existence of a split or merge candidate segment. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
extendClaim
protected JdbcTokenStore(JdbcTokenStore.Builder builder)
JdbcTokenStore
based on the fields contained in the JdbcTokenStore.Builder
.
Will assert that the ConnectionProvider
, Serializer
, TokenSchema
, claimTimeout
,
nodeId
and contentType
are not null
, and will throw an AxonConfigurationException
if any of them is null
.
builder
- the JdbcTokenStore.Builder
used to instantiate a JdbcTokenStore
instancepublic static JdbcTokenStore.Builder builder()
JdbcTokenStore
.
The schema
is defaulted to an TokenSchema
, the claimTimeout
to a 10 seconds duration,
nodeId
is defaulted to the name of the managed bean for the runtime system of the Java virtual machine
and the contentType
to a byte[]
Class
. The ConnectionProvider
and
Serializer
are hard requirements and as such should be provided.
JdbcTokenStore
public void createSchema(TokenTableFactory schemaFactory)
schemaFactory
- factory of the token entry schemapublic void initializeTokenSegments(@Nonnull String processorName, int segmentCount) throws UnableToClaimTokenException
TokenStore
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will initialize the tokens, but not claim them. It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments
in interface TokenStore
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeUnableToClaimTokenException
- when a segment has already been createdpublic void initializeTokenSegments(@Nonnull String processorName, int segmentCount, TrackingToken initialToken) throws UnableToClaimTokenException
TokenStore
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store initialToken
for all segments as starting point for processor, but not claim them.
It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments
in interface TokenStore
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeinitialToken
- The initial token which is used as a starting point for processorUnableToClaimTokenException
- when a segment has already been createdpublic void initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, int segment) throws UnableToInitializeTokenException
TokenStore
segment
for the processor with given processorName
to contain
the given token
.
This method fails if a Token already exists for the given processor and segment, even if that token has been claimed by the active instance.
This method will not claim the initialized segment. Use TokenStore.fetchToken(String, int)
to retrieve and claim
the token.
initializeSegment
in interface TokenStore
token
- The token to initialize the segment withprocessorName
- The name of the processor to create the segment forsegment
- The identifier of the segment to initializeUnableToInitializeTokenException
- if a Token already existspublic boolean requiresExplicitSegmentInitialization()
TokenStore
requiresExplicitSegmentInitialization
in interface TokenStore
true
if this instance requires tokens to be explicitly initialized, otherwise false
.TokenStore.initializeTokenSegments(String, int)
,
TokenStore.initializeTokenSegments(String, int, TrackingToken)
,
TokenStore.initializeSegment(TrackingToken, String, int)
public Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdentifierException
TokenStore
Note that this method may require the implementation to consult its underlying storage. Therefore, a Transaction
should be active when this method is called, similarly to invocations like TokenStore.fetchToken(String, int)
,
TokenStore.fetchSegments(String)
, etc. When no Transaction is active, the behavior is undefined.
retrieveStorageIdentifier
in interface TokenStore
UnableToRetrieveIdentifierException
- when the implementation was unable to determine its identifierpublic Serializer serializer()
public void storeToken(TrackingToken token, @Nonnull String processorName, int segment) throws UnableToClaimTokenException
TokenStore
token
in the store. The token marks the current position of the process with given
processorName
and segment
. The given token
may be null
.
Any claims made by the current process have their timestamp updated.
This method should throw an UnableToClaimTokenException
when the given segment
has not been
initialized with a Token (albeit null
) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the TokenStore.requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
storeToken
in interface TokenStore
token
- The token to store for a given process and segment. May be null
.processorName
- The name of the process for which to store the tokensegment
- The index of the segment for which to store the tokenUnableToClaimTokenException
- when the token being updated has been claimed by another process.public TrackingToken fetchToken(@Nonnull String processorName, int segment) throws UnableToClaimTokenException
TokenStore
token
for the given processorName
and segment
.
Returns null
if the stored token for the given process and segment is
null
.
This method should throw an UnableToClaimTokenException
when the given segment
has not been
initialized with a Token (albeit null
) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the TokenStore.requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release
the claim, use TokenStore.releaseClaim(String, int)
fetchToken
in interface TokenStore
processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokennull
if the store holds no token for given process and segmentUnableToClaimTokenException
- if there is a token for given processorName
and segment
, but
they are claimed by another process.public TrackingToken fetchToken(@Nonnull String processorName, @Nonnull Segment segment) throws UnableToClaimTokenException
TokenStore
token
for the given processorName
and segment
. Returns null
if the stored token for
the given process and segment is null
.
This method should throw an UnableToClaimTokenException
when the given segment
has not been initialized with a Token (albeit null
) yet. In that case, a segment must have been explicitly initialized. A TokenStore implementation's ability to do so is exposed by the TokenStore.requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use TokenStore.releaseClaim(String, int)
fetchToken
in interface TokenStore
processorName
- The process name for which to fetch the tokensegment
- The segment for which to fetch the tokennull
if the store holds no token for given process and segmentUnableToClaimTokenException
- if there is a token for given processorName
and segment
, but they are claimed by another process, or
if the segment has been split or merged concurrently
public void releaseClaim(@Nonnull String processorName, int segment)
TokenStore
processorName
and segment
. If no such claim existed,
nothing happens.
The caller must ensure not to use any streams opened based on the token for which the claim is released.
releaseClaim
in interface TokenStore
processorName
- The name of the process owning the token (e.g. a TrackingEventProcessor name)segment
- the segment for which a token was obtainedpublic void deleteToken(@Nonnull String processorName, int segment)
TokenStore
processorName
and segment
. The token must
be owned by the current node, to be able to delete it.
Implementations should implement this method only when TokenStore.requiresExplicitSegmentInitialization()
is overridden to
return true
. Deleting tokens using implementations that do not require explicit token initialization is
unsafe, as a claim will automatically recreate the deleted token instance, which may result in concurrency
issues.
deleteToken
in interface TokenStore
processorName
- The name of the processor to remove the token forsegment
- The segment to deletepublic int[] fetchSegments(@Nonnull String processorName)
TokenStore
segments
for a given processorName
.
The segments returned are segments for which a token has been stored previously. When the TokenStore
is
empty, an empty array is returned.
fetchSegments
in interface TokenStore
processorName
- The process name for which to fetch the segmentspublic List<Segment> fetchAvailableSegments(@Nonnull String processorName)
TokenStore
segments
for a given processorName
. A segment is considered available if it is not claimed by any
other event processor.
The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the TokenStore
is empty, an empty list is returned.
By default, if this method is not implemented, we will return all segments instead, whether they are available or not.
fetchAvailableSegments
in interface TokenStore
processorName
- the processor's name for which to fetch the segmentsprocessorName
protected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException
PreparedStatement
to select all segments ids for a given processorName from the underlying
storage.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the segments forPreparedStatement
that will fetch segments when executedSQLException
- when an exception occurs while creating the prepared statementprotected PreparedStatement selectTokenEntries(Connection connection, String processorName) throws SQLException
PreparedStatement
to select all TokenEntries
for a given processorName from the underlying storage.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the segments forPreparedStatement
that will fetch TokenEntries when executedSQLException
- when an exception occurs while creating the prepared statementprotected PreparedStatement storeUpdate(Connection connection, TrackingToken token, String processorName, int segment) throws SQLException
PreparedStatement
which updates the given token
for the given processorName
and
segment
combination.connection
- the connection to the underlying databasetoken
- the new token to storeprocessorName
- the name of the processor executing the updatesegment
- the segment of the processor to executing the updatePreparedStatement
that will update a token entry when executedSQLException
- when an exception occurs while creating the prepared statementprotected PreparedStatement selectForUpdate(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement
to select a token entry from the underlying storage. The ResultSet
that is returned when this statement is executed should be updatable.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the entry forsegment
- the segment of the processor to fetch the entry forPreparedStatement
that will fetch an updatable token entry when executedSQLException
- when an exception occurs while creating the prepared statementprotected PreparedStatement select(Connection connection, String processorName, int segment, boolean forUpdate) throws SQLException
PreparedStatement
to select a token entry from the underlying storage, either for updating
or just for reading.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the entry forsegment
- the segment of the processor to fetch the entry forforUpdate
- whether the returned token should be updatablePreparedStatement
that will fetch an updatable token entry when executedSQLException
- when an exception occurs while creating the prepared statementprotected void updateToken(Connection connection, ResultSet resultSet, TrackingToken token, String processorName, int segment) throws SQLException
resultSet
has an entry, attempts to replace the token in the entry with the given
token
and claim ownership.connection
- the connection to the underlying databaseresultSet
- the updatable query result set of an executed PreparedStatement
token
- the token for the new or updated entryprocessorName
- the name of the processor owning the tokensegment
- the segment of the processor owning the tokenUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while updating the result setprotected TrackingToken claimToken(Connection connection, AbstractTokenEntry<?> entry) throws SQLException
entry
. If the claim fails an UnableToClaimTokenException
should be
thrown. Otherwise the given resultSet
should be updated to reflect the claim.connection
- the connection to the underlying databaseentry
- the entry extracted from the given result setUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while claiming the token entryprotected TrackingToken loadToken(Connection connection, ResultSet resultSet, String processorName, int segment) throws SQLException
processorName
and segment
. If
such a token entry exists an attempt will be made to claim the token. If that succeeds the token will be
returned. If the token is already owned by another node an UnableToClaimTokenException
will be thrown.
If no such token exists yet, a new token entry will be inserted with null
token owned by this node and
return null
.
connection
- the connection to the underlying databaseresultSet
- the updatable result set from a prior select for update queryprocessorName
- the name of the processor to load or insert a token entry forsegment
- the segment of the processor to load or insert a token entry fornull
if a new entry was insertedUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while loading or inserting the entryprotected TrackingToken loadToken(Connection connection, ResultSet resultSet, String processorName, Segment segment) throws SQLException
processorName
and segment
. If such a token entry exists an attempt will
be made to claim the token. If that succeeds the token will be returned. If the token is already owned by another node an UnableToClaimTokenException
will be thrown.
If no such token exists yet, a new token entry will be inserted with a null
token, owned by this node, and this method returns null
.
If a token has been claimed, the segment
will be validated by checking the database for the split and merge candidate segments. If a concurrent
split or merge operation has been detected, the calim will be released and an UnableToClaimTokenException
will be thrown.}
connection
- the connection to the underlying databaseresultSet
- the updatable result set from a prior select for update queryprocessorName
- the name of the processor to load or insert a token entry forsegment
- the segment of the processor to load or insert a token entry fornull
if a new entry was insertedUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the token or if the segment has been split or
merged concurrentlySQLException
- when an exception occurs while loading or inserting the entryprotected void validateSegment(String processorName, Segment segment)
segment
by checking for the existence of a split or merge candidate segment.
If the segment has been split concurrently, the split segment candidate will be found, indicating that we have claimed an incorrect segment
. If
the segment has been merged concurrently, the merge candidate segment will no longer exist, also indicating that we have claimed an incorrect segment
.
processorName
- the name of the processor to load or insert a token entry forsegment
- the segment of the processor to load or insert a token entry forprotected PreparedStatement selectSegments(Connection connection, String processorName, int splitSegmentId, int mergeableSegmentId) throws SQLException
PreparedStatement
for the count of segments that can be found after searching for the splitSegmentId
and mergableSegmetnId
.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to load or insert a token entry forsplitSegmentId
- the id of the split candidate segmentmergeableSegmentId
- the id of the merge candiate segmentSQLException
- when an Exception occurs in building the PreparedStatement
protected TrackingToken insertTokenEntry(Connection connection, TrackingToken token, String processorName, int segment) throws SQLException
resultSet
.connection
- the connection to the underlying databasetoken
- the token of the entry to insertprocessorName
- the name of the processor to insert a token forsegment
- the segment of the processor to insert a token forSQLException
- when an exception occurs while inserting a token entryprotected AbstractTokenEntry<?> readTokenEntry(ResultSet resultSet) throws SQLException
resultSet
to an AbstractTokenEntry
. The result set contains a single token entry.resultSet
- the result set of a prior select statement containing a single token entrySQLException
- if the result set cannot be converted to an entryprotected PreparedStatement releaseClaim(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement
to release the current claim this node has on a token belonging to a
processor with given processorName
and segment
.connection
- the connection that should be used to create a PreparedStatement
processorName
- the name of the processor for which to release this node's claimsegment
- the segment of the processor for which to release this node's claimPreparedStatement
that will release the claim this node has on the token entrySQLException
- if the statement to release a claim cannot be createdprotected PreparedStatement deleteToken(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement
to release the current claim this node has on a token belonging to a
processor with given processorName
and segment
.connection
- the connection that should be used to create a PreparedStatement
processorName
- the name of the processor for which to release this node's claimsegment
- the segment of the processor for which to release this node's claimPreparedStatement
that will release the claim this node has on the token entrySQLException
- if the statement to release a claim cannot be createdprotected <T> T readSerializedData(ResultSet resultSet, String columnName) throws SQLException
resultSet
at given columnName
.T
- the type of data to returnresultSet
- the result set to get serialized data fromcolumnName
- the name of the column containing the serialized tokenSQLException
- if the token cannot be read from the entryprotected Connection getConnection()
Connection
to the database.Copyright © 2010–2023. All rights reserved.