Interface TokenStore
- All Known Implementing Classes:
InMemoryTokenStore,JdbcTokenStore,JpaTokenStore
tracking tokens.
A StreamingEventProcessor that is tracking an event
stream can use the store to keep track of its position in the event stream. Tokens are stored by process name and
segment index, enabling the same processor to be distributed over multiple processes or machines.
- Since:
- 3.0.0
- Author:
- Allard Buijze, Rene de Waele
-
Method Summary
Modifier and TypeMethodDescriptiondeleteToken(String processorName, int segmentId, ProcessingContext context) Deletes the token associated with the specifiedprocessorNameandsegmentId.default CompletableFuture<Void> extendClaim(String processorName, int segmentId, ProcessingContext context) Extends the claim on the current token held by this node for the givenprocessorNameandsegment.fetchAvailableSegments(String processorName, ProcessingContext context) fetchSegment(String processorName, int segmentId, ProcessingContext context) Returns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.fetchSegments(String processorName, ProcessingContext context) Returns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.fetchToken(String processorName, int segmentId, ProcessingContext context) default CompletableFuture<TrackingToken> fetchToken(String processorName, Segment segment, ProcessingContext context) initializeSegment(TrackingToken token, String processorName, Segment segment, ProcessingContext context) Initializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.initializeTokenSegments(String processorName, int segmentCount, TrackingToken initialToken, ProcessingContext context) Initializes a given number of segments for the givenprocessorNameto track its tokens.releaseClaim(String processorName, int segmentId, ProcessingContext context) Release a claim of the token for givenprocessorNameandsegment.Retrieves the storage identifier associated with this store.storeToken(TrackingToken token, String processorName, int segmentId, ProcessingContext context) Stores the giventokenin the store.
-
Method Details
-
initializeTokenSegments
@Nonnull CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) Initializes a given number of segments for the givenprocessorNameto track its tokens.Returns a
CompletableFuturethat completes when the segments were successfully initialized. The returned future will complete exceptionally with anUnableToClaimTokenExceptionwhen a segment to be initialized already exists.This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store the given
initialTokenfor all segments as the starting point for the processor with the givenprocessorName, but not claim them. It will create the segments ranging from0untilsegmentCount - 1.The exact behavior when this method is called while tokens were already present is undefined in case the token already present is not owned by the initializing process.
- Parameters:
processorName- The name of the processor to initialize segments for.segmentCount- The number of segments to initialize.initialToken- The initial token which is used as a starting point for the processor.context- The processing context to use when initializing the segments, if any.- Returns:
- A
CompletableFuturethat completes when the segments have been initialized
-
storeToken
@Nonnull CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Stores the giventokenin the store.Returns a
CompletableFuturethat completes when the token was successfully stored. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token was not initialized, or was claimed by another process.The token marks the current position of the process with given
processorNameandsegment. The giventokenmay benull. Any claims made by the current process have their timestamp updated.- Parameters:
token- The token to store for a given process and segment. May benull.processorName- The name of the process for which to store the token.segmentId- The index of the segment for which to store the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token has been stored.
-
fetchToken
@Nonnull CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Fetches the last storedtokenfor the givenprocessorNameandsegmentId.Returns a
CompletableFuturethat completes with the fetched token, ornullif the token wasnull. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use
releaseClaim(String, int, ProcessingContext)- Parameters:
processorName- The process name for which to fetch the token.segmentId- The segment index for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the last stored TrackingToken ornullif the store holds no token for the given process and segment.
-
fetchToken
@Nonnull default CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) Fetches the last storedtokenfor the givenprocessorNameandsegment.Returns a
CompletableFuturethat completes with the fetched token, ornullif the token wasnull. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use
releaseClaim(String, int, ProcessingContext)- Parameters:
processorName- The process name for which to fetch the token.segment- The segment for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the last storedtokenornullif the store holds no token for the given process and segment.
-
extendClaim
@Nonnull default CompletableFuture<Void> extendClaim(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Extends the claim on the current token held by this node for the givenprocessorNameandsegment.Returns a
CompletableFuturethat completes when the claim has been successfully extended. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.- Parameters:
processorName- The process name for which to fetch the token.segmentId- The segment index for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the claim-extension has been completed.
-
releaseClaim
@Nonnull CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Release a claim of the token for givenprocessorNameandsegment. If no such claim existed, nothing happens.The caller must ensure not to use any streams opened based on the token for which the claim is released.
- Parameters:
processorName- The name of the process owning the token (e.g. a PooledStreamingEventProcessor name).segmentId- The segment for which a token was obtained.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the claim-release has been completed.
-
initializeSegment
@Nonnull CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) Initializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.Returns a
CompletableFuturethat completes when the token segment has been successfully initialized. The returned future will complete exceptionally with anUnableToInitializeTokenExceptionif the token already exists.This method fails if a token already exists for the given processor and segment, even if that token has been claimed by the active instance. This method will not claim the initialized segment. Use
fetchToken(String, int, ProcessingContext)to retrieve and claim the token.- Parameters:
token- The token to initialize the segment with.processorName- The name of the processor to create the segment for.segment- The segment to initialize.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the segment has been initialized. - Throws:
UnsupportedOperationException- If this implementation does not support explicit initialization.
-
deleteToken
@Nonnull CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Deletes the token associated with the specifiedprocessorNameandsegmentId.Returns a
CompletableFuturethat completes when the token has been successfully deleted. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token is not currently claimed by this node.The token must be owned by the current process (JVM instance) to be able to delete it.
- Parameters:
processorName- The name of the processor to remove the token for.segmentId- The segment to delete.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token deletion is complete. - Throws:
UnsupportedOperationException- If this operation is not supported by this implementation.
-
fetchSegment
@Nonnull CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Returns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.- Parameters:
processorName- The process name for which to fetch the segment.segmentId- The segment index to fetchcontext- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the segment, ornullon completion
-
fetchSegments
@Nonnull CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) Returns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.The segments returned are segments for which a token has been stored previously. When the
TokenStoreis empty, theCompletableFuturewill return an empty array.- Parameters:
processorName- The process name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith a list of segments on completion.
-
fetchAvailableSegments
@Nonnull CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context) Returns aCompletableFuturesupplying aListof known availableSegmentsfor a givenprocessorName.A segment is considered available if it is not claimed by any other event processor.
The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the
TokenStoreis empty, an empty list is returned.- Parameters:
processorName- The processor's name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
Listof availableSegmentsfor the specifiedprocessorName.
-
retrieveStorageIdentifier
Retrieves the storage identifier associated with this store. The returned identifier uniquely identifies the storage location for the tokens in this store.Returns a
CompletableFuturethat completes with the storage identifier associated with this store. The returned future will complete exceptionally with anUnableToRetrieveIdentifierExceptionif the identifier could not be retrieved.Two token store implementations that share state must return the same identifier. Two token store implementations that do not share a location must return a different identifier (or an empty optional if identifiers are not supported).
Note that this method may require the implementation to consult its underlying storage. Therefore, a transaction should be active when this method is called, similarly to invocations like
fetchToken(String, int, ProcessingContext),fetchSegments(String, ProcessingContext), etc. When no transaction is active, the behavior is undefined.- Parameters:
context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat provides an identifier to uniquely identify the storage location of tokens in thisTokenStoreon completion.
-