public class JdbcTokenStore extends Object implements TokenStore
TokenSchema.tokenTable()
in which to store the tokens.Constructor and Description |
---|
JdbcTokenStore(ConnectionProvider connectionProvider,
Serializer serializer)
Initialize the JpaTokenStore with given resources.
|
JdbcTokenStore(ConnectionProvider connectionProvider,
Serializer serializer,
TokenSchema schema,
TemporalAmount claimTimeout,
String nodeId,
Class<?> contentType)
Initialize the JpaTokenStore with given resources.
|
Modifier and Type | Method and Description |
---|---|
protected TrackingToken |
claimToken(ResultSet resultSet,
AbstractTokenEntry<?> entry)
Tries to claim the given token
entry . |
void |
createSchema(TokenTableFactory schemaFactory)
Performs the DDL queries to create the schema necessary for this token store implementation.
|
int[] |
fetchSegments(String processorName)
Returns an array of known
segments for a given processorName . |
TrackingToken |
fetchToken(String processorName,
int segment)
|
protected Connection |
getConnection()
Returns a
Connection to the database. |
void |
initializeTokenSegments(String processorName,
int segmentCount)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
protected void |
insertOrUpdateToken(ResultSet resultSet,
TrackingToken token,
String processorName,
int segment)
If the given
resultSet has no items this method should insert a new token entry. |
protected TrackingToken |
insertTokenEntry(ResultSet resultSet,
TrackingToken token,
String processorName,
int segment)
Inserts a new token entry via the given updatable
resultSet . |
protected TrackingToken |
loadOrInsertToken(ResultSet resultSet,
String processorName,
int segment)
Tries loading an existing token owned by a processor with given
processorName and segment . |
protected <T> T |
readSerializedData(ResultSet resultSet,
String columnName)
Returns the serialized token data from the given
resultSet at given columnName . |
protected AbstractTokenEntry<?> |
readTokenEntry(ResultSet resultSet)
Convert given
resultSet to an AbstractTokenEntry . |
protected PreparedStatement |
releaseClaim(Connection connection,
String processorName,
int segment)
Creates a new
PreparedStatement to release the current claim this node has on a token belonging to a
processor with given processorName and segment . |
void |
releaseClaim(String processorName,
int segment)
Release a claim of the token for given
processorName and segment . |
protected PreparedStatement |
selectForSegments(Connection connection,
String processorName)
Returns a
PreparedStatement to select all segments ids for a given processorName from the underlying storage. |
protected PreparedStatement |
selectForUpdate(Connection connection,
String processorName,
int segment)
Returns a
PreparedStatement to select a token entry from the underlying storage. |
void |
storeToken(TrackingToken token,
String processorName,
int segment)
Stores the given
token in the store. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
extendClaim
public JdbcTokenStore(ConnectionProvider connectionProvider, Serializer serializer)
claimTimeout
is used to 'steal' any claim
that has not been updated since that amount of time. The token is serialized to a byte array.connectionProvider
- The provider of connections to the underlying databaseserializer
- The serializer to serialize tokens withpublic JdbcTokenStore(ConnectionProvider connectionProvider, Serializer serializer, TokenSchema schema, TemporalAmount claimTimeout, String nodeId, Class<?> contentType)
claimTimeout
is used to 'steal' any claim
that has not been updated since that amount of time.connectionProvider
- The provider of connections to the underlying databaseserializer
- The serializer to serialize tokens withschema
- The schema that describes a Jdbc token entryclaimTimeout
- The timeout after which this process will force a claimnodeId
- The identifier to identify ownership of the tokenscontentType
- The data type of the serialized tokenpublic void createSchema(TokenTableFactory schemaFactory)
schemaFactory
- factory of the token entry schemaEventStoreException
- when an error occurs executing SQL statementspublic void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException
TokenStore
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will initialize the tokens, but no claim them. It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments
in interface TokenStore
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeUnableToClaimTokenException
- when a segment has already been createdpublic void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException
TokenStore
token
in the store. The token marks the current position of the process with given
processorName
and segment
. The given token
may be null
.
Any claims made by the current process have their timestamp updated.storeToken
in interface TokenStore
token
- The token to store for a given process and segment. May be null
.processorName
- The name of the process for which to store the tokensegment
- The index of the segment for which to store the tokenUnableToClaimTokenException
- when the token being updated has been claimed by another process.public TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException
TokenStore
token
for the given processorName
and segment
.
Returns null
if the store holds no token or if the stored token for the given process and segment is
null
.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release
the claim, use TokenStore.releaseClaim(String, int)
fetchToken
in interface TokenStore
processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokennull
if the store holds no token for given process and segmentUnableToClaimTokenException
- if there is a token for given processorName
and segment
, but
they are claimed by another process.public void releaseClaim(String processorName, int segment)
TokenStore
processorName
and segment
. If no such claim existed,
nothing happens.
The caller must ensure not to use any streams opened based on the token for which the claim is released.
releaseClaim
in interface TokenStore
processorName
- The name of the process owning the token (e.g. a TrackingEventProcessor name)segment
- the segment for which a token was obtainedpublic int[] fetchSegments(String processorName)
TokenStore
segments
for a given processorName
.
The segments returned are segments for which a token has been stored previously. When the TokenStore
is
empty, an empty array is returned.
fetchSegments
in interface TokenStore
processorName
- The process name for which to fetch the segmentsprotected PreparedStatement selectForSegments(Connection connection, String processorName) throws SQLException
PreparedStatement
to select all segments ids for a given processorName from the underlying storage.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the segments forPreparedStatement
that will fetch segments when executedSQLException
- when an exception occurs while creating the prepared statementprotected PreparedStatement selectForUpdate(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement
to select a token entry from the underlying storage. The ResultSet
that is returned when this statement is executed should be updatable.connection
- the connection to the underlying databaseprocessorName
- the name of the processor to fetch the entry forsegment
- the segment of the processor to fetch the entry forPreparedStatement
that will fetch an updatable token entry when executedSQLException
- when an exception occurs while creating the prepared statementprotected void insertOrUpdateToken(ResultSet resultSet, TrackingToken token, String processorName, int segment) throws SQLException
resultSet
has no items this method should insert a new token entry. If a token already
exists it should be attempted to replace the token in the entry with the given token
and claim ownership.resultSet
- the updatable query result set of an executed PreparedStatement
token
- the token for the new or updated entryprocessorName
- the name of the processor owning the tokensegment
- the segment of the processor owning the tokenUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while updating the result setprotected TrackingToken claimToken(ResultSet resultSet, AbstractTokenEntry<?> entry) throws SQLException
entry
. If the claim fails an UnableToClaimTokenException
should be
thrown. Otherwise the given resultSet
should be updated to reflect the claim.resultSet
- the updatable query result of an executed PreparedStatement
entry
- the entry extracted from the given result setUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while claiming the token entryprotected TrackingToken loadOrInsertToken(ResultSet resultSet, String processorName, int segment) throws SQLException
processorName
and segment
. If
such a token entry exists an attempt will be made to claim the token. If that succeeds the token will be
returned. If the token is already owned by another node an UnableToClaimTokenException
will be thrown.
If no such token exists yet, a new token entry will be inserted with null
token owned by this node and
return null
.
resultSet
- the updatable result set from a prior select for update queryprocessorName
- the name of the processor to load or insert a token entry forsegment
- the segment of the processor to load or insert a token entry fornull
if a new entry was insertedUnableToClaimTokenException
- if the token cannot be claimed because another node currently owns the tokenSQLException
- when an exception occurs while loading or inserting the entryprotected TrackingToken insertTokenEntry(ResultSet resultSet, TrackingToken token, String processorName, int segment) throws SQLException
resultSet
.resultSet
- the updatable result set to add the entry totoken
- the token of the entry to insertprocessorName
- the name of the processor to insert a token forsegment
- the segment of the processor to insert a token forSQLException
- when an exception occurs while inserting a token entryprotected AbstractTokenEntry<?> readTokenEntry(ResultSet resultSet) throws SQLException
resultSet
to an AbstractTokenEntry
. The result set contains a single token entry.resultSet
- the result set of a prior select statement containing a single token entrySQLException
- if the result set cannot be converted to an entryprotected PreparedStatement releaseClaim(Connection connection, String processorName, int segment) throws SQLException
PreparedStatement
to release the current claim this node has on a token belonging to a
processor with given processorName
and segment
.connection
- the connection that should be used to create a PreparedStatement
processorName
- the name of the processor for which to release this node's claimsegment
- the segment of the processor for which to release this node's claimPreparedStatement
that will release the claim this node has on the token entrySQLException
- if the statement to release a claim cannot be createdprotected <T> T readSerializedData(ResultSet resultSet, String columnName) throws SQLException
resultSet
at given columnName
.T
- the type of data to returnresultSet
- the result set to get serialized data fromcolumnName
- the name of the column containing the serialized tokenSQLException
- if the token cannot be read from the entryprotected Connection getConnection()
Connection
to the database.Copyright © 2010–2018. All rights reserved.