java.lang.Object
org.axonframework.messaging.eventhandling.processing.streaming.token.store.jpa.JpaTokenStore
All Implemented Interfaces:
TokenStore

public class JpaTokenStore extends Object implements TokenStore
Implementation of a token store that uses JPA to save and load tokens. This implementation uses TokenEntry entities.
Since:
3.0.0
Author:
Rene de Waele
  • Constructor Details

  • Method Details

    • initializeTokenSegments

      @Nonnull public CompletableFuture<List<Segment>> initializeTokenSegments(@Nonnull String processorName, int segmentCount, @Nullable TrackingToken initialToken, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Initializes a given number of segments for the given processorName to track its tokens.

      Returns a CompletableFuture that completes when the segments were successfully initialized. The returned future will complete exceptionally with an UnableToClaimTokenException when a segment to be initialized already exists.

      This method should only be invoked when no tokens have been stored for the given processor, yet.

      This method will store the given initialToken for all segments as the starting point for the processor with the given processorName, but not claim them. It will create the segments ranging from 0 until segmentCount - 1.

      The exact behavior when this method is called while tokens were already present is undefined in case the token already present is not owned by the initializing process.

      Specified by:
      initializeTokenSegments in interface TokenStore
      Parameters:
      processorName - The name of the processor to initialize segments for.
      segmentCount - The number of segments to initialize.
      initialToken - The initial token which is used as a starting point for the processor.
      context - The processing context to use when initializing the segments, if any.
      Returns:
      A CompletableFuture that completes when the segments have been initialized
    • storeToken

      @Nonnull public CompletableFuture<Void> storeToken(@Nullable TrackingToken token, @Nonnull String processorName, int segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Stores the given token in the store.

      Returns a CompletableFuture that completes when the token was successfully stored. The returned future will complete exceptionally with an UnableToClaimTokenException if the token was not initialized, or was claimed by another process.

      The token marks the current position of the process with given processorName and segment. The given token may be null. Any claims made by the current process have their timestamp updated.

      Specified by:
      storeToken in interface TokenStore
      Parameters:
      token - The token to store for a given process and segment. May be null.
      processorName - The name of the process for which to store the token.
      segment - The index of the segment for which to store the token.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that completes when the token has been stored.
    • releaseClaim

      @Nonnull public CompletableFuture<Void> releaseClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Release a claim of the token for given processorName and segment. If no such claim existed, nothing happens.

      The caller must ensure not to use any streams opened based on the token for which the claim is released.

      Specified by:
      releaseClaim in interface TokenStore
      Parameters:
      processorName - The name of the process owning the token (e.g. a PooledStreamingEventProcessor name).
      segment - The segment for which a token was obtained.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that completes when the claim-release has been completed.
    • initializeSegment

      @Nonnull public CompletableFuture<Void> initializeSegment(@Nullable TrackingToken token, @Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Initializes a segment with given segment for the processor with given processorName to contain the given token.

      Returns a CompletableFuture that completes when the token segment has been successfully initialized. The returned future will complete exceptionally with an UnableToInitializeTokenException if the token already exists.

      This method fails if a token already exists for the given processor and segment, even if that token has been claimed by the active instance. This method will not claim the initialized segment. Use TokenStore.fetchToken(String, int, ProcessingContext) to retrieve and claim the token.

      Specified by:
      initializeSegment in interface TokenStore
      Parameters:
      token - The token to initialize the segment with.
      processorName - The name of the processor to create the segment for.
      segment - The segment to initialize.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that completes when the segment has been initialized.
    • deleteToken

      @Nonnull public CompletableFuture<Void> deleteToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Deletes the token associated with the specified processorName and segmentId.

      Returns a CompletableFuture that completes when the token has been successfully deleted. The returned future will complete exceptionally with an UnableToClaimTokenException if the token is not currently claimed by this node.

      The token must be owned by the current process (JVM instance) to be able to delete it.

      Specified by:
      deleteToken in interface TokenStore
      Parameters:
      processorName - The name of the processor to remove the token for.
      segment - The segment to delete.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that completes when the token deletion is complete.
    • fetchToken

      @Nonnull public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, int segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Fetches the last stored token for the given processorName and segmentId.

      Returns a CompletableFuture that completes with the fetched token, or null if the token was null. The returned future will complete exceptionally with an UnableToClaimTokenException if the token did not exist or was claimed by another process.

      The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use TokenStore.releaseClaim(String, int, ProcessingContext)

      Specified by:
      fetchToken in interface TokenStore
      Parameters:
      processorName - The process name for which to fetch the token.
      segment - The segment index for which to fetch the token.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture with the last stored TrackingToken or null if the store holds no token for the given process and segment.
    • fetchToken

      @Nonnull public CompletableFuture<TrackingToken> fetchToken(@Nonnull String processorName, @Nonnull Segment segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Fetches the last stored token for the given processorName and segment.

      Returns a CompletableFuture that completes with the fetched token, or null if the token was null. The returned future will complete exceptionally with an UnableToClaimTokenException if the token did not exist or was claimed by another process.

      The token will be claimed by the current process (JVM instance), preventing access by other instances. To release the claim, use TokenStore.releaseClaim(String, int, ProcessingContext)

      Specified by:
      fetchToken in interface TokenStore
      Parameters:
      processorName - The process name for which to fetch the token.
      segment - The segment for which to fetch the token.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture with the last stored token or null if the store holds no token for the given process and segment.
    • extendClaim

      @Nonnull public CompletableFuture<Void> extendClaim(@Nonnull String processorName, int segment, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Extends the claim on the current token held by this node for the given processorName and segment.

      Returns a CompletableFuture that completes when the claim has been successfully extended. The returned future will complete exceptionally with an UnableToClaimTokenException if the token did not exist or was claimed by another process.

      Specified by:
      extendClaim in interface TokenStore
      Parameters:
      processorName - The process name for which to fetch the token.
      segment - The segment index for which to fetch the token.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that completes when the claim-extension has been completed.
    • fetchSegment

      @Nonnull public CompletableFuture<Segment> fetchSegment(@Nonnull String processorName, int segmentId, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Returns a CompletableFuture that supplies the specified Segment, or null if there was no such segment.
      Specified by:
      fetchSegment in interface TokenStore
      Parameters:
      processorName - The process name for which to fetch the segment.
      segmentId - The segment index to fetch
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture with the segment, or null on completion
    • fetchSegments

      @Nonnull public CompletableFuture<List<Segment>> fetchSegments(@Nonnull String processorName, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Returns a CompletableFuture that supplies a list of known segments for a given processorName on completion.

      The segments returned are segments for which a token has been stored previously. When the TokenStore is empty, the CompletableFuture will return an empty array.

      Specified by:
      fetchSegments in interface TokenStore
      Parameters:
      processorName - The process name for which to fetch the segments.
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture with a list of segments on completion.
    • fetchAvailableSegments

      @Nonnull public CompletableFuture<List<Segment>> fetchAvailableSegments(@Nonnull String processorName, @Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Returns a CompletableFuture supplying a List of known available Segments for a given processorName.

      A segment is considered available if it is not claimed by any other event processor.

      The segments returned are segments for which a token has been stored previously and have not been claimed by another processor. When the TokenStore is empty, an empty list is returned.

      Specified by:
      fetchAvailableSegments in interface TokenStore
      Parameters:
      processorName - The processor's name for which to fetch the segments.
      context - The current ProcessingContext, if any.
      Returns:
      A List of available Segments for the specified processorName.
    • loadToken

      protected TokenEntry loadToken(String processorName, int segment, jakarta.persistence.EntityManager entityManager)
      Loads an existing TokenEntry or creates a new one using the given entityManager for given processorName and segment.
      Parameters:
      processorName - The name of the event processor.
      segment - The segment of the event processor.
      entityManager - The entity manager instance to use for the query.
      Returns:
      The token entry for the given processor name and segment.
      Throws:
      UnableToClaimTokenException - If there is a token for given processorName and segment, but it is claimed by another process.
    • loadToken

      protected TokenEntry loadToken(String processorName, Segment segment, jakarta.persistence.EntityManager entityManager)
      Tries loading an existing token owned by a processor with given processorName and segment. If such a token entry exists an attempt will be made to claim the token. If that succeeds the token will be returned. If the token is already owned by another node an UnableToClaimTokenException will be thrown.

      If no such token exists yet, a new token entry will be inserted with a null token, owned by this node, and this method returns null.

      If a token has been claimed, the segment will be validated by checking the database for the split and merge candidate segments. If a concurrent split or merge operation has been detected, the calim will be released and an UnableToClaimTokenException will be thrown.}

      Parameters:
      processorName - The name of the processor to load or insert a token entry for.
      segment - The segment of the processor to load or insert a token entry for.
      entityManager - The entity manager instance to use for the query.
      Returns:
      The tracking token of the fetched entry or null if a new entry was inserted.
      Throws:
      UnableToClaimTokenException - If the token cannot be claimed because another node currently owns the token or if the segment has been split or merged concurrently.
    • retrieveStorageIdentifier

      @Nonnull public CompletableFuture<String> retrieveStorageIdentifier(@Nullable ProcessingContext context)
      Description copied from interface: TokenStore
      Retrieves the storage identifier associated with this store. The returned identifier uniquely identifies the storage location for the tokens in this store.

      Returns a CompletableFuture that completes with the storage identifier associated with this store. The returned future will complete exceptionally with an UnableToRetrieveIdentifierException if the identifier could not be retrieved.

      Two token store implementations that share state must return the same identifier. Two token store implementations that do not share a location must return a different identifier (or an empty optional if identifiers are not supported).

      Note that this method may require the implementation to consult its underlying storage. Therefore, a transaction should be active when this method is called, similarly to invocations like TokenStore.fetchToken(String, int, ProcessingContext), TokenStore.fetchSegments(String, ProcessingContext), etc. When no transaction is active, the behavior is undefined.

      Specified by:
      retrieveStorageIdentifier in interface TokenStore
      Parameters:
      context - The current ProcessingContext, if any.
      Returns:
      A CompletableFuture that provides an identifier to uniquely identify the storage location of tokens in this TokenStore on completion.
    • converter

      @Internal public Converter converter()
      Returns the Converter used by the TokenStore to serialize tokens.
      Returns:
      The Converter used by the TokenStore to serialize tokens.