Class PooledStreamingEventProcessor

java.lang.Object
org.axonframework.messaging.eventhandling.processing.streaming.pooled.PooledStreamingEventProcessor
All Implemented Interfaces:
DescribableComponent, EventProcessor, StreamingEventProcessor

public class PooledStreamingEventProcessor extends Object implements StreamingEventProcessor
A StreamingEventProcessor implementation which pools its resources to enhance processing speed. It utilizes a Coordinator as the means to stream events from a StreamableEventSource and creates so-called work packages. Every work package is in charge of a Segment of the entire event stream. It is the Coordinator's job to retrieve the events from the source and provide the events to all the work packages it is in charge of.

This approach utilizes two threads pools. One to retrieve the events to provide them to the work packages and another to actual handle the events. Respectively, the coordinator thread pool and the work package thread pool. It is this approach which allows for greater parallelization and processing speed than the TrackingEventProcessor (removed in 5.0.0).

If no TrackingTokens are present for this processor, the PooledStreamingEventProcessor will initialize them in a given segment count. By default, it will create 16 segments, which can be configured through the PooledStreamingEventProcessorConfiguration.initialSegmentCount(int).

Since:
4.5.0
Author:
Allard Buijze, Steven van Beelen
  • Constructor Details

  • Method Details

    • name

      public String name()
      Description copied from interface: EventProcessor
      Returns the name of this event processor. This name is used to detect distributed instances of the same event processor. Multiple instances referring to the same logical event processor (on different JVM's) must have the same name.
      Specified by:
      name in interface EventProcessor
      Returns:
      the name of this event processor
    • start

      public CompletableFuture<Void> start()
      Description copied from interface: EventProcessor
      Initiates a start, providing a CompletableFuture that completes when the start process is finished.
      Specified by:
      start in interface EventProcessor
      Returns:
      a CompletableFuture that completes when the start process is finished.
    • shutdown

      public CompletableFuture<Void> shutdown()
      Description copied from interface: EventProcessor
      Initiates a shutdown, providing a CompletableFuture that completes when the shutdown process is finished.
      Specified by:
      shutdown in interface EventProcessor
      Returns:
      a CompletableFuture that completes when the shutdown process is finished.
    • isRunning

      public boolean isRunning()
      Description copied from interface: EventProcessor
      Indicates whether this processor is currently running (i.e. consuming events from its message source).
      Specified by:
      isRunning in interface EventProcessor
      Returns:
      true when running, otherwise false
    • isError

      public boolean isError()
      Description copied from interface: EventProcessor
      Indicates whether the processor has been shut down due to an error. In such case, the processor has forcefully shut down, as it wasn't able to automatically recover.

      Note that this method returns false when the processor was stopped using EventProcessor.shutdown().

      Specified by:
      isError in interface EventProcessor
      Returns:
      true when paused due to an error, otherwise false
    • getTokenStoreIdentifier

      public String getTokenStoreIdentifier()
      Description copied from interface: StreamingEventProcessor
      Returns the unique identifier of the TokenStore used by this StreamingEventProcessor.
      Specified by:
      getTokenStoreIdentifier in interface StreamingEventProcessor
      Returns:
      the unique identifier of the TokenStore used by this StreamingEventProcessor
    • releaseSegment

      public CompletableFuture<Void> releaseSegment(int segmentId)
      Description copied from interface: StreamingEventProcessor
      Instructs the processor to release the segment with given segmentId.
      Specified by:
      releaseSegment in interface StreamingEventProcessor
      Parameters:
      segmentId - the id of the segment to release
    • releaseSegment

      public CompletableFuture<Void> releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)
      Description copied from interface: StreamingEventProcessor
      Instructs the processor to release the segment with given segmentId. This processor will not try to claim the given segment for the specified releaseDuration in the given unit, to ensure it is not immediately reclaimed. Note that this will override any previous release duration that existed for this segment. Providing a negative value will allow the segment to be immediately claimed.

      If the processor is not actively processing the segment with given segmentId, claiming it will be ignored for the given timeframe nonetheless.

      Specified by:
      releaseSegment in interface StreamingEventProcessor
      Parameters:
      segmentId - the id of the segment to be released for the specified releaseDuration
      releaseDuration - the amount of time to disregard segmentId for processing
      unit - the unit of time used to express the releaseDuration
    • claimSegment

      public CompletableFuture<Boolean> claimSegment(int segmentId)
      Description copied from interface: StreamingEventProcessor
      Instructs the processor to claim the segment with given segmentId and start processing it as soon as possible.

      The given segmentId must not be currently processed by a different processor instance, as that will have an active claim on the token. Claiming a segment that is already being processed will have no effect and return true.

      A true return value indicates that the segment has been claimed and will be processed by this processor. The StreamingEventProcessor may postpone start of work until after completion of this task, as long as the token has been claimed so work can be started. A return value of false indicates that the segment has not been claimed due to the token for that segment not being available.

      Specified by:
      claimSegment in interface StreamingEventProcessor
      Parameters:
      segmentId - the identifier of the segment to claim and start processing
      Returns:
      a CompletableFuture providing the result of the claim operation
    • splitSegment

      public CompletableFuture<Boolean> splitSegment(int segmentId)
      Description copied from interface: StreamingEventProcessor
      Instruct the processor to split the segment with given segmentId into two segments, allowing an additional process to start processing events from it.

      To be able to split segments, the TokenStore configured with this processor must use explicitly initialized tokens. Also, the given segmentId must be currently processed by a process owned by this processor instance.

      Specified by:
      splitSegment in interface StreamingEventProcessor
      Parameters:
      segmentId - the identifier of the segment to split
      Returns:
      a CompletableFuture providing the result of the split operation
    • mergeSegment

      public CompletableFuture<Boolean> mergeSegment(int segmentId)
      Description copied from interface: StreamingEventProcessor
      Instruct the processor to merge the segment with given segmentId back with the segment that it was originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore, this other segment must not have any active claims in the TokenStore.

      The processor must currently be actively processing the segment with given segmentId.

      Use StreamingEventProcessor.releaseSegment(int) to force this processor to release any claims with tokens required to merge the segments.

      To find out which segment a given segmentId should be merged with, use the following procedure:

           EventTrackerStatus status = processor.processingStatus().get(segmentId);
           if (status == null) {
               // this processor is not processing segmentId, and will not be able to merge
           }
           return status.getSegment().mergeableSegmentId();
       
      Specified by:
      mergeSegment in interface StreamingEventProcessor
      Parameters:
      segmentId - the identifier of the segment to merge
      Returns:
      a CompletableFuture indicating whether the merge was executed successfully
    • supportsReset

      public boolean supportsReset()
      Description copied from interface: StreamingEventProcessor
      Indicates whether this StreamingEventProcessor supports a "reset". Generally, a reset is supported if at least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly prevent the resets.

      This method should be invoked prior to invoking any of the StreamingEventProcessor.resetTokens() operations as an early validation.

      Specified by:
      supportsReset in interface StreamingEventProcessor
      Returns:
      true if resets are supported, false otherwise
    • resetTokens

      public CompletableFuture<Void> resetTokens()
      Description copied from interface: StreamingEventProcessor
      Resets tokens to their initial state. This effectively causes a replay.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
    • resetTokens

      public <R> CompletableFuture<Void> resetTokens(R resetContext)
      Description copied from interface: StreamingEventProcessor
      Resets tokens to their initial state. This effectively causes a replay. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      resetContext - a R used to support the reset operation
    • resetTokens

      public CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource,CompletableFuture<TrackingToken>> initialTrackingTokenSupplier)
      Description copied from interface: StreamingEventProcessor
      Reset tokens to the position as return by the given initialTrackingTokenSupplier. This effectively causes a replay since that position.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
      Parameters:
      initialTrackingTokenSupplier - a function returning the token representing the position to reset to
    • resetTokens

      public <R> CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource,CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, R resetContext)
      Description copied from interface: StreamingEventProcessor
      Reset tokens to the position as return by the given initialTrackingTokenSupplier. This effectively causes a replay since that position. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      initialTrackingTokenSupplier - a function returning the token representing the position to reset to
      resetContext - a R used to support the reset operation
    • resetTokens

      public CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition)
      Description copied from interface: StreamingEventProcessor
      Resets tokens to the given startPosition. This effectively causes a replay of events since that position.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
      Parameters:
      startPosition - the token representing the position to reset the processor to
    • resetTokens

      public <R> CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition, R resetContext)
      Description copied from interface: StreamingEventProcessor
      Resets tokens to the given startPosition. This effectively causes a replay of events since that position. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Specified by:
      resetTokens in interface StreamingEventProcessor
      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      startPosition - the token representing the position to reset the processor to
      resetContext - a R used to support the reset operation
    • maxCapacity

      public int maxCapacity()
      Specifies the maximum amount of segments this EventProcessor can process at the same time.

      The maximum capacity of the PooledStreamingEventProcessor defaults to 32767. If required, this value can be adjusted through the PooledStreamingEventProcessorConfiguration.maxClaimedSegments(int) method.

      Specified by:
      maxCapacity in interface StreamingEventProcessor
      Returns:
      the maximum amount of segments this EventProcessor can process at the same time
    • processingStatus

      public Map<Integer,EventTrackerStatus> processingStatus()
      Description copied from interface: StreamingEventProcessor
      Returns the status for each of the segments processed by this processor as EventTrackerStatus instances. The key of the Map represent the segment ids processed by this instance. The values of the returned Map represent the last known status of that segment.

      Note that the returned Map is unmodifiable, but does reflect any changes made to the status as the processor is processing Events.

      Specified by:
      processingStatus in interface StreamingEventProcessor
      Returns:
      the status for each of the segments processed by the current processor
    • describeTo

      public void describeTo(@Nonnull ComponentDescriptor descriptor)
      Description copied from interface: DescribableComponent
      Describe the properties of this DescribableComponent with the given descriptor.

      Components should call the appropriate describeProperty methods on the descriptor to register their properties. The descriptor is responsible for determining how these properties are formatted and structured in the final output.

      Best Practices: As a general rule, all relevant fields of a DescribableComponent implementation should be described in this method. However, developers have discretion to include only the fields that make sense in the context. Not every field may be meaningful for description purposes, especially internal implementation details. Furthermore, components might want to expose different information based on their current state. The final decision on what properties to include lies with the person implementing the describeTo method, who should focus on providing information that is useful for understanding the component's configuration and state.

      Example implementation:

       public void describeTo(ComponentDescriptor descriptor) {
           descriptor.describeProperty("name", this.name);
           descriptor.describeProperty("enabled", this.enabled);
           descriptor.describeProperty("configuration", this.configuration); // A nested component
           descriptor.describeProperty("handlers", this.eventHandlers);      // A collection
       }
       
      Specified by:
      describeTo in interface DescribableComponent
      Parameters:
      descriptor - The component descriptor to describe this DescribableComponentn its properties in.