Class InMemoryTokenStore
- All Implemented Interfaces:
TokenStore
TokenStore that stores tracking tokens in memory. This implementation is thread-safe.- Since:
- 3.0.0
- Author:
- Rene de Waele, Christophe Bouhier
-
Constructor Summary
ConstructorsConstructorDescriptionNo-arg constructor which will log a warning on initialization. -
Method Summary
Modifier and TypeMethodDescriptiondeleteToken(String processorName, int segment, ProcessingContext context) Deletes the token associated with the specifiedprocessorNameandsegmentId.fetchAvailableSegments(String processorName, ProcessingContext context) fetchSegment(String processorName, int segmentId, ProcessingContext context) Returns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.fetchSegments(String processorName, ProcessingContext context) Returns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.fetchToken(String processorName, int segmentId, ProcessingContext context) initializeSegment(TrackingToken token, String processorName, Segment segment, ProcessingContext context) Initializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.initializeTokenSegments(String processorName, int segmentCount, TrackingToken initialToken, ProcessingContext context) Initializes a given number of segments for the givenprocessorNameto track its tokens.releaseClaim(String processorName, int segment, ProcessingContext context) Release a claim of the token for givenprocessorNameandsegment.Retrieves the storage identifier associated with this store.storeToken(TrackingToken token, String processorName, int segmentId, ProcessingContext context) Stores the giventokenin the store.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.axonframework.messaging.eventhandling.processing.streaming.token.store.TokenStore
extendClaim, fetchToken
-
Constructor Details
-
InMemoryTokenStore
public InMemoryTokenStore()No-arg constructor which will log a warning on initialization.
-
-
Method Details
-
initializeTokenSegments
@Nonnull public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) throws UnableToClaimTokenException Description copied from interface:TokenStoreInitializes a given number of segments for the givenprocessorNameto track its tokens.Returns a
CompletableFuturethat completes when the segments were successfully initialized. The returned future will complete exceptionally with anUnableToClaimTokenExceptionwhen a segment to be initialized already exists.This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store the given
initialTokenfor all segments as the starting point for the processor with the givenprocessorName, but not claim them. It will create the segments ranging from0untilsegmentCount - 1.The exact behavior when this method is called while tokens were already present is undefined in case the token already present is not owned by the initializing process.
- Specified by:
initializeTokenSegmentsin interfaceTokenStore- Parameters:
processorName- The name of the processor to initialize segments for.segmentCount- The number of segments to initialize.initialToken- The initial token which is used as a starting point for the processor.context- The processing context to use when initializing the segments, if any.- Returns:
- A
CompletableFuturethat completes when the segments have been initialized - Throws:
UnableToClaimTokenException
-
storeToken
@Nonnull public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Description copied from interface:TokenStoreStores the giventokenin the store.Returns a
CompletableFuturethat completes when the token was successfully stored. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token was not initialized, or was claimed by another process.The token marks the current position of the process with given
processorNameandsegment. The giventokenmay benull. Any claims made by the current process have their timestamp updated.- Specified by:
storeTokenin interfaceTokenStore- Parameters:
token- The token to store for a given process and segment. May benull.processorName- The name of the process for which to store the token.segmentId- The index of the segment for which to store the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token has been stored.
-
fetchToken
@Nonnull public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Description copied from interface:TokenStoreFetches the last storedtokenfor the givenprocessorNameandsegmentId.Returns a
CompletableFuturethat completes with the fetched token, ornullif the token wasnull. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use
TokenStore.releaseClaim(String, int, ProcessingContext)- Specified by:
fetchTokenin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the token.segmentId- The segment index for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the last stored TrackingToken ornullif the store holds no token for the given process and segment.
-
releaseClaim
@Nonnull public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreRelease a claim of the token for givenprocessorNameandsegment. If no such claim existed, nothing happens.The caller must ensure not to use any streams opened based on the token for which the claim is released.
- Specified by:
releaseClaimin interfaceTokenStore- Parameters:
processorName- The name of the process owning the token (e.g. a PooledStreamingEventProcessor name).segment- The segment for which a token was obtained.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the claim-release has been completed.
-
deleteToken
@Nonnull public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) throws UnableToClaimTokenException Description copied from interface:TokenStoreDeletes the token associated with the specifiedprocessorNameandsegmentId.Returns a
CompletableFuturethat completes when the token has been successfully deleted. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token is not currently claimed by this node.The token must be owned by the current process (JVM instance) to be able to delete it.
- Specified by:
deleteTokenin interfaceTokenStore- Parameters:
processorName- The name of the processor to remove the token for.segment- The segment to delete.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token deletion is complete. - Throws:
UnableToClaimTokenException
-
initializeSegment
@Nonnull public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) throws UnableToInitializeTokenException Description copied from interface:TokenStoreInitializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.Returns a
CompletableFuturethat completes when the token segment has been successfully initialized. The returned future will complete exceptionally with anUnableToInitializeTokenExceptionif the token already exists.This method fails if a token already exists for the given processor and segment, even if that token has been claimed by the active instance. This method will not claim the initialized segment. Use
TokenStore.fetchToken(String, int, ProcessingContext)to retrieve and claim the token.- Specified by:
initializeSegmentin interfaceTokenStore- Parameters:
token- The token to initialize the segment with.processorName- The name of the processor to create the segment for.segment- The segment to initialize.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the segment has been initialized. - Throws:
UnableToInitializeTokenException
-
fetchSegment
@Nonnull public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.- Specified by:
fetchSegmentin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the segment.segmentId- The segment index to fetchcontext- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the segment, ornullon completion
-
fetchSegments
@Nonnull public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.The segments returned are segments for which a token has been stored previously. When the
TokenStoreis empty, theCompletableFuturewill return an empty array.- Specified by:
fetchSegmentsin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith a list of segments on completion.
-
fetchAvailableSegments
public CompletableFuture<List<Segment>> fetchAvailableSegments(String processorName, ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturesupplying aListof known availableSegmentsfor a givenprocessorName.A segment is considered available if it is not claimed by any other event processor.
The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the
TokenStoreis empty, an empty list is returned.- Specified by:
fetchAvailableSegmentsin interfaceTokenStore- Parameters:
processorName- The processor's name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
Listof availableSegmentsfor the specifiedprocessorName.
-
retrieveStorageIdentifier
@Nonnull public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context) Description copied from interface:TokenStoreRetrieves the storage identifier associated with this store. The returned identifier uniquely identifies the storage location for the tokens in this store.Returns a
CompletableFuturethat completes with the storage identifier associated with this store. The returned future will complete exceptionally with anUnableToRetrieveIdentifierExceptionif the identifier could not be retrieved.Two token store implementations that share state must return the same identifier. Two token store implementations that do not share a location must return a different identifier (or an empty optional if identifiers are not supported).
Note that this method may require the implementation to consult its underlying storage. Therefore, a transaction should be active when this method is called, similarly to invocations like
TokenStore.fetchToken(String, int, ProcessingContext),TokenStore.fetchSegments(String, ProcessingContext), etc. When no transaction is active, the behavior is undefined.- Specified by:
retrieveStorageIdentifierin interfaceTokenStore- Parameters:
context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat provides an identifier to uniquely identify the storage location of tokens in thisTokenStoreon completion.
-