public interface TokenStore
EventProcessor
that is
tracking an event stream can use the store to keep track of its position in the event stream. Tokens are stored by
process name and segment index, enabling the same processor to be distributed over multiple processes or machines.Modifier and Type | Method and Description |
---|---|
default void |
deleteToken(String processorName,
int segment)
Deletes the token for the processor with given
processorName and segment . |
default void |
extendClaim(String processorName,
int segment)
Extends the claim on the current token held by the this node for the given
processorName and
segment . |
int[] |
fetchSegments(String processorName)
Returns an array of known
segments for a given processorName . |
TrackingToken |
fetchToken(String processorName,
int segment)
|
default void |
initializeSegment(TrackingToken token,
String processorName,
int segment)
Initializes a segment with given
segment for the processor with given processorName to contain
the given token . |
default void |
initializeTokenSegments(String processorName,
int segmentCount)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
default void |
initializeTokenSegments(String processorName,
int segmentCount,
TrackingToken initialToken)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
void |
releaseClaim(String processorName,
int segment)
Release a claim of the token for given
processorName and segment . |
default boolean |
requiresExplicitSegmentInitialization()
Indicates whether this TokenStore instance requires segments to be explicitly initialized, before any tokens
can be claimed for that segment.
|
default Optional<String> |
retrieveStorageIdentifier()
Returns a unique identifier that uniquely identifies the storage location of the tokens in this store.
|
void |
storeToken(TrackingToken token,
String processorName,
int segment)
Stores the given
token in the store. |
default void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will initialize the tokens, but not claim them. It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeUnableToClaimTokenException
- when a segment has already been createddefault void initializeTokenSegments(String processorName, int segmentCount, TrackingToken initialToken) throws UnableToClaimTokenException
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store initialToken
for all segments as starting point for processor, but not claim them.
It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeinitialToken
- The initial token which is used as a starting point for processorUnableToClaimTokenException
- when a segment has already been createdvoid storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException
token
in the store. The token marks the current position of the process with given
processorName
and segment
. The given token
may be null
.
Any claims made by the current process have their timestamp updated.
This method should throw an UnableToClaimTokenException
when the given segment
has not been
initialized with a Token (albeit null
) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
token
- The token to store for a given process and segment. May be null
.processorName
- The name of the process for which to store the tokensegment
- The index of the segment for which to store the tokenUnableToClaimTokenException
- when the token being updated has been claimed by another process.TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException
token
for the given processorName
and segment
.
Returns null
if the stored token for the given process and segment is
null
.
This method should throw an UnableToClaimTokenException
when the given segment
has not been
initialized with a Token (albeit null
) yet. In that case, a segment must have been explicitly initialized.
A TokenStore implementation's ability to do so is exposed by the requiresExplicitSegmentInitialization()
method. If that method returns false, this method may implicitly initialize a token and return that token upon
invocation.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release
the claim, use releaseClaim(String, int)
processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokennull
if the store holds no token for given process and segmentUnableToClaimTokenException
- if there is a token for given processorName
and segment
, but
they are claimed by another process.default void extendClaim(String processorName, int segment) throws UnableToClaimTokenException
processorName
and
segment
.processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokenUnableToClaimTokenException
- if there is no token for given processorName
and segment
, or
if it has been claimed by another process.fetchToken(String, int)
, which also extends the claim if the
token is held. TokenStore implementations may choose to implement this method if they can provide a more efficient
way of extending this claim.void releaseClaim(String processorName, int segment)
processorName
and segment
. If no such claim existed,
nothing happens.
The caller must ensure not to use any streams opened based on the token for which the claim is released.
processorName
- The name of the process owning the token (e.g. a TrackingEventProcessor name)segment
- the segment for which a token was obtaineddefault void initializeSegment(TrackingToken token, String processorName, int segment) throws UnableToInitializeTokenException
segment
for the processor with given processorName
to contain
the given token
.
This method fails if a Token already exists for the given processor and segment, even if that token has been claimed by the active instance.
This method will not claim the initialized segment. Use fetchToken(String, int)
to retrieve and claim
the token.
token
- The token to initialize the segment withprocessorName
- The name of the processor to create the segment forsegment
- The identifier of the segment to initializeUnableToInitializeTokenException
- if a Token already existsUnsupportedOperationException
- if this implementation does not support explicit initialization.
See requiresExplicitSegmentInitialization()
.default void deleteToken(String processorName, int segment) throws UnableToClaimTokenException
processorName
and segment
. The token must
be owned by the current node, to be able to delete it.
Implementations should implement this method only when requiresExplicitSegmentInitialization()
is overridden to
return true
. Deleting tokens using implementations that do not require explicit token initialization is
unsafe, as a claim will automatically recreate the deleted token instance, which may result in concurrency
issues.
processorName
- The name of the processor to remove the token forsegment
- The segment to deleteUnableToClaimTokenException
- if the token is not currently claimed by this nodeUnsupportedOperationException
- if this operation is not supported by this implementationdefault boolean requiresExplicitSegmentInitialization()
true
if this instance requires tokens to be explicitly initialized, otherwise false
.initializeTokenSegments(String, int)
,
initializeTokenSegments(String, int, TrackingToken)
,
initializeSegment(TrackingToken, String, int)
int[] fetchSegments(String processorName)
segments
for a given processorName
.
The segments returned are segments for which a token has been stored previously. When the TokenStore
is
empty, an empty array is returned.
processorName
- The process name for which to fetch the segmentsdefault Optional<String> retrieveStorageIdentifier() throws UnableToRetrieveIdentifierException
Note that this method may require the implementation to consult its underlying storage. Therefore, a Transaction
should be active when this method is called, similarly to invocations like fetchToken(String, int)
,
fetchSegments(String)
, etc. When no Transaction is active, the behavior is undefined.
UnableToRetrieveIdentifierException
- when the implementation was unable to determine its identifierCopyright © 2010–2020. All rights reserved.