Class JpaTokenStore
- All Implemented Interfaces:
TokenStore
TokenEntry
entities.- Since:
- 3.0.0
- Author:
- Rene de Waele
-
Constructor Summary
ConstructorsConstructorDescriptionJpaTokenStore(EntityManagerProvider entityManagerProvider, Converter converter, JpaTokenStoreConfiguration configuration) Instantiate a {JpaTokenStore} based on the fields contained in theJpaTokenStoreConfiguration. -
Method Summary
Modifier and TypeMethodDescriptionReturns theConverterused by theTokenStoreto serialize tokens.deleteToken(String processorName, int segment, ProcessingContext context) Deletes the token associated with the specifiedprocessorNameandsegmentId.extendClaim(String processorName, int segment, ProcessingContext context) Extends the claim on the current token held by this node for the givenprocessorNameandsegment.fetchAvailableSegments(String processorName, ProcessingContext context) fetchSegment(String processorName, int segmentId, ProcessingContext context) Returns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.fetchSegments(String processorName, ProcessingContext context) Returns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.fetchToken(String processorName, int segment, ProcessingContext context) fetchToken(String processorName, Segment segment, ProcessingContext context) initializeSegment(TrackingToken token, String processorName, Segment segment, ProcessingContext context) Initializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.initializeTokenSegments(String processorName, int segmentCount, TrackingToken initialToken, ProcessingContext context) Initializes a given number of segments for the givenprocessorNameto track its tokens.protected TokenEntryLoads an existingTokenEntryor creates a new one using the givenentityManagerfor givenprocessorNameandsegment.protected TokenEntryTries loading an existing token owned by a processor with givenprocessorNameandsegment.releaseClaim(String processorName, int segment, ProcessingContext context) Release a claim of the token for givenprocessorNameandsegment.Retrieves the storage identifier associated with this store.storeToken(TrackingToken token, String processorName, int segment, ProcessingContext context) Stores the giventokenin the store.
-
Constructor Details
-
JpaTokenStore
public JpaTokenStore(@Nonnull EntityManagerProvider entityManagerProvider, @Nonnull Converter converter, @Nonnull JpaTokenStoreConfiguration configuration) Instantiate a {JpaTokenStore} based on the fields contained in theJpaTokenStoreConfiguration.Will assert that the
EntityManagerProvider,ConverterandJpaTokenStoreConfigurationare notnull, otherwise anAxonConfigurationExceptionwill be thrown.- Parameters:
entityManagerProvider- TheEntityManagerProviderused to obtain anEntityManagerfor.converter- TheConverterused to serialize and deserialize token for storage.configuration- The configuration for JPA token store.
-
-
Method Details
-
initializeTokenSegments
@Nonnull public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context) Description copied from interface:TokenStoreInitializes a given number of segments for the givenprocessorNameto track its tokens.Returns a
CompletableFuturethat completes when the segments were successfully initialized. The returned future will complete exceptionally with anUnableToClaimTokenExceptionwhen a segment to be initialized already exists.This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will store the given
initialTokenfor all segments as the starting point for the processor with the givenprocessorName, but not claim them. It will create the segments ranging from0untilsegmentCount - 1.The exact behavior when this method is called while tokens were already present is undefined in case the token already present is not owned by the initializing process.
- Specified by:
initializeTokenSegmentsin interfaceTokenStore- Parameters:
processorName- The name of the processor to initialize segments for.segmentCount- The number of segments to initialize.initialToken- The initial token which is used as a starting point for the processor.context- The processing context to use when initializing the segments, if any.- Returns:
- A
CompletableFuturethat completes when the segments have been initialized
-
storeToken
@Nonnull public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreStores the giventokenin the store.Returns a
CompletableFuturethat completes when the token was successfully stored. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token was not initialized, or was claimed by another process.The token marks the current position of the process with given
processorNameandsegment. The giventokenmay benull. Any claims made by the current process have their timestamp updated.- Specified by:
storeTokenin interfaceTokenStore- Parameters:
token- The token to store for a given process and segment. May benull.processorName- The name of the process for which to store the token.segment- The index of the segment for which to store the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token has been stored.
-
releaseClaim
@Nonnull public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreRelease a claim of the token for givenprocessorNameandsegment. If no such claim existed, nothing happens.The caller must ensure not to use any streams opened based on the token for which the claim is released.
- Specified by:
releaseClaimin interfaceTokenStore- Parameters:
processorName- The name of the process owning the token (e.g. a PooledStreamingEventProcessor name).segment- The segment for which a token was obtained.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the claim-release has been completed.
-
initializeSegment
@Nonnull public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreInitializes a segment with givensegmentfor the processor with givenprocessorNameto contain the giventoken.Returns a
CompletableFuturethat completes when the token segment has been successfully initialized. The returned future will complete exceptionally with anUnableToInitializeTokenExceptionif the token already exists.This method fails if a token already exists for the given processor and segment, even if that token has been claimed by the active instance. This method will not claim the initialized segment. Use
TokenStore.fetchToken(String, int, ProcessingContext)to retrieve and claim the token.- Specified by:
initializeSegmentin interfaceTokenStore- Parameters:
token- The token to initialize the segment with.processorName- The name of the processor to create the segment for.segment- The segment to initialize.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the segment has been initialized.
-
deleteToken
@Nonnull public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreDeletes the token associated with the specifiedprocessorNameandsegmentId.Returns a
CompletableFuturethat completes when the token has been successfully deleted. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token is not currently claimed by this node.The token must be owned by the current process (JVM instance) to be able to delete it.
- Specified by:
deleteTokenin interfaceTokenStore- Parameters:
processorName- The name of the processor to remove the token for.segment- The segment to delete.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the token deletion is complete.
-
fetchToken
@Nonnull public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreFetches the last storedtokenfor the givenprocessorNameandsegmentId.Returns a
CompletableFuturethat completes with the fetched token, ornullif the token wasnull. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use
TokenStore.releaseClaim(String, int, ProcessingContext)- Specified by:
fetchTokenin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the token.segment- The segment index for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the last stored TrackingToken ornullif the store holds no token for the given process and segment.
-
fetchToken
@Nonnull public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreFetches the last storedtokenfor the givenprocessorNameandsegment.Returns a
CompletableFuturethat completes with the fetched token, ornullif the token wasnull. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use
TokenStore.releaseClaim(String, int, ProcessingContext)- Specified by:
fetchTokenin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the token.segment- The segment for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the last storedtokenornullif the store holds no token for the given process and segment.
-
extendClaim
@Nonnull public CompletableFuture<Void> extendClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context) Description copied from interface:TokenStoreExtends the claim on the current token held by this node for the givenprocessorNameandsegment.Returns a
CompletableFuturethat completes when the claim has been successfully extended. The returned future will complete exceptionally with anUnableToClaimTokenExceptionif the token did not exist or was claimed by another process.- Specified by:
extendClaimin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the token.segment- The segment index for which to fetch the token.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat completes when the claim-extension has been completed.
-
fetchSegment
@Nonnull public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturethat supplies the specifiedSegment, ornullif there was no such segment.- Specified by:
fetchSegmentin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the segment.segmentId- The segment index to fetchcontext- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith the segment, ornullon completion
-
fetchSegments
@Nonnull public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturethat supplies a list of knownsegmentsfor a givenprocessorNameon completion.The segments returned are segments for which a token has been stored previously. When the
TokenStoreis empty, theCompletableFuturewill return an empty array.- Specified by:
fetchSegmentsin interfaceTokenStore- Parameters:
processorName- The process name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturewith a list of segments on completion.
-
fetchAvailableSegments
@Nonnull public CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context) Description copied from interface:TokenStoreReturns aCompletableFuturesupplying aListof known availableSegmentsfor a givenprocessorName.A segment is considered available if it is not claimed by any other event processor.
The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the
TokenStoreis empty, an empty list is returned.- Specified by:
fetchAvailableSegmentsin interfaceTokenStore- Parameters:
processorName- The processor's name for which to fetch the segments.context- The currentProcessingContext, if any.- Returns:
- A
Listof availableSegmentsfor the specifiedprocessorName.
-
loadToken
protected TokenEntry loadToken(String processorName, int segment, jakarta.persistence.EntityManager entityManager) Loads an existingTokenEntryor creates a new one using the givenentityManagerfor givenprocessorNameandsegment.- Parameters:
processorName- The name of the event processor.segment- The segment of the event processor.entityManager- The entity manager instance to use for the query.- Returns:
- The token entry for the given processor name and segment.
- Throws:
UnableToClaimTokenException- If there is a token for givenprocessorNameandsegment, but it is claimed by another process.
-
loadToken
protected TokenEntry loadToken(String processorName, Segment segment, jakarta.persistence.EntityManager entityManager) Tries loading an existing token owned by a processor with givenprocessorNameandsegment. If such a token entry exists an attempt will be made to claim the token. If that succeeds the token will be returned. If the token is already owned by another node anUnableToClaimTokenExceptionwill be thrown.If no such token exists yet, a new token entry will be inserted with a
nulltoken, owned by this node, and this method returnsnull.If a token has been claimed, the
segmentwill be validated by checking the database for the split and merge candidate segments. If a concurrent split or merge operation has been detected, the calim will be released and anUnableToClaimTokenExceptionwill be thrown.}- Parameters:
processorName- The name of the processor to load or insert a token entry for.segment- The segment of the processor to load or insert a token entry for.entityManager- The entity manager instance to use for the query.- Returns:
- The tracking token of the fetched entry or
nullif a new entry was inserted. - Throws:
UnableToClaimTokenException- If the token cannot be claimed because another node currently owns the token or if the segment has been split or merged concurrently.
-
retrieveStorageIdentifier
@Nonnull public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context) Description copied from interface:TokenStoreRetrieves the storage identifier associated with this store. The returned identifier uniquely identifies the storage location for the tokens in this store.Returns a
CompletableFuturethat completes with the storage identifier associated with this store. The returned future will complete exceptionally with anUnableToRetrieveIdentifierExceptionif the identifier could not be retrieved.Two token store implementations that share state must return the same identifier. Two token store implementations that do not share a location must return a different identifier (or an empty optional if identifiers are not supported).
Note that this method may require the implementation to consult its underlying storage. Therefore, a transaction should be active when this method is called, similarly to invocations like
TokenStore.fetchToken(String, int, ProcessingContext),TokenStore.fetchSegments(String, ProcessingContext), etc. When no transaction is active, the behavior is undefined.- Specified by:
retrieveStorageIdentifierin interfaceTokenStore- Parameters:
context- The currentProcessingContext, if any.- Returns:
- A
CompletableFuturethat provides an identifier to uniquely identify the storage location of tokens in thisTokenStoreon completion.
-
converter
Returns theConverterused by theTokenStoreto serialize tokens.- Returns:
- The
Converterused by theTokenStoreto serialize tokens.
-