Class PooledStreamingEventProcessor
- All Implemented Interfaces:
DescribableComponent,EventProcessor,StreamingEventProcessor
StreamingEventProcessor implementation which pools its resources to enhance processing speed. It utilizes a
Coordinator as the means to stream events from a StreamableEventSource and creates so-called work
packages. Every work package is in charge of a Segment of the entire event stream. It is the
Coordinator's job to retrieve the events from the source and provide the events to all the work packages it
is in charge of.
This approach utilizes two threads pools. One to retrieve the events to provide them to the work packages and another to actual handle the events. Respectively, the coordinator thread pool and the work package thread pool. It is this approach which allows for greater parallelization and processing speed than the TrackingEventProcessor (removed in 5.0.0).
If no TrackingTokens are present for this processor, the PooledStreamingEventProcessor will
initialize them in a given segment count. By default, it will create 16 segments, which can be configured
through the PooledStreamingEventProcessorConfiguration.initialSegmentCount(int).
- Since:
- 4.5.0
- Author:
- Allard Buijze, Steven van Beelen
-
Constructor Summary
ConstructorsConstructorDescriptionPooledStreamingEventProcessor(String name, List<EventHandlingComponent> eventHandlingComponents, PooledStreamingEventProcessorConfiguration configuration) Instantiate aPooledStreamingEventProcessorwith givenname,eventHandlingComponentsand based on the fields contained in thePooledStreamingEventProcessorConfiguration. -
Method Summary
Modifier and TypeMethodDescriptionclaimSegment(int segmentId) Instructs the processor to claim the segment with givensegmentIdand start processing it as soon as possible.voiddescribeTo(ComponentDescriptor descriptor) Describe the properties ofthis DescribableComponentwith the givendescriptor.Returns the unique identifier of theTokenStoreused by thisStreamingEventProcessor.booleanisError()Indicates whether the processor has been shut down due to an error.booleanIndicates whether this processor is currently running (i.e. consuming events from its message source).intSpecifies the maximum amount of segments thisEventProcessorcan process at the same time.mergeSegment(int segmentId) Instruct the processor to merge the segment with givensegmentIdback with the segment that it was originally split from.name()Returns the name of this event processor.Returns the status for each of the segments processed by this processor asEventTrackerStatusinstances.releaseSegment(int segmentId) Instructs the processor to release the segment with givensegmentId.releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) Instructs the processor to release the segment with givensegmentId.Resets tokens to their initial state.resetTokens(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier.<R> CompletableFuture<Void> resetTokens(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, R resetContext) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier.resetTokens(TrackingToken startPosition) Resets tokens to the givenstartPosition.<R> CompletableFuture<Void> resetTokens(TrackingToken startPosition, R resetContext) Resets tokens to the givenstartPosition.<R> CompletableFuture<Void> resetTokens(R resetContext) Resets tokens to their initial state.shutdown()Initiates a shutdown, providing aCompletableFuturethat completes when the shutdown process is finished.splitSegment(int segmentId) Instruct the processor to split the segment with givensegmentIdinto two segments, allowing an additional process to start processing events from it.start()Initiates a start, providing aCompletableFuturethat completes when the start process is finished.booleanIndicates whether thisStreamingEventProcessorsupports a "reset".Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.axonframework.messaging.eventhandling.processing.streaming.StreamingEventProcessor
isReplaying
-
Constructor Details
-
PooledStreamingEventProcessor
public PooledStreamingEventProcessor(@Nonnull String name, @Nonnull List<EventHandlingComponent> eventHandlingComponents, @Nonnull PooledStreamingEventProcessorConfiguration configuration) Instantiate aPooledStreamingEventProcessorwith givenname,eventHandlingComponentsand based on the fields contained in thePooledStreamingEventProcessorConfiguration.Will assert the following for their presence in the configuration, prior to constructing this processor:
- A
StreamableEventSource. - A
TokenStore. - A
UnitOfWorkFactory. - A
ScheduledExecutorServicefor coordination. - A
ScheduledExecutorServiceto process work packages.
AxonConfigurationExceptionis thrown.- Parameters:
name- AStringdefining thisEventProcessorinstance.eventHandlingComponents- TheEventHandlingComponents which will handle all the individualEventMessages.configuration- ThePooledStreamingEventProcessorConfigurationused to configure aPooledStreamingEventProcessorinstance.
- A
-
-
Method Details
-
name
Description copied from interface:EventProcessorReturns the name of this event processor. This name is used to detect distributed instances of the same event processor. Multiple instances referring to the same logical event processor (on different JVM's) must have the same name.- Specified by:
namein interfaceEventProcessor- Returns:
- the name of this event processor
-
start
Description copied from interface:EventProcessorInitiates a start, providing aCompletableFuturethat completes when the start process is finished.- Specified by:
startin interfaceEventProcessor- Returns:
- a CompletableFuture that completes when the start process is finished.
-
shutdown
Description copied from interface:EventProcessorInitiates a shutdown, providing aCompletableFuturethat completes when the shutdown process is finished.- Specified by:
shutdownin interfaceEventProcessor- Returns:
- a CompletableFuture that completes when the shutdown process is finished.
-
isRunning
public boolean isRunning()Description copied from interface:EventProcessorIndicates whether this processor is currently running (i.e. consuming events from its message source).- Specified by:
isRunningin interfaceEventProcessor- Returns:
truewhen running, otherwisefalse
-
isError
public boolean isError()Description copied from interface:EventProcessorIndicates whether the processor has been shut down due to an error. In such case, the processor has forcefully shut down, as it wasn't able to automatically recover.Note that this method returns
falsewhen the processor was stopped usingEventProcessor.shutdown().- Specified by:
isErrorin interfaceEventProcessor- Returns:
truewhen paused due to an error, otherwisefalse
-
getTokenStoreIdentifier
Description copied from interface:StreamingEventProcessorReturns the unique identifier of theTokenStoreused by thisStreamingEventProcessor.- Specified by:
getTokenStoreIdentifierin interfaceStreamingEventProcessor- Returns:
- the unique identifier of the
TokenStoreused by thisStreamingEventProcessor
-
releaseSegment
Description copied from interface:StreamingEventProcessorInstructs the processor to release the segment with givensegmentId.- Specified by:
releaseSegmentin interfaceStreamingEventProcessor- Parameters:
segmentId- the id of the segment to release
-
releaseSegment
Description copied from interface:StreamingEventProcessorInstructs the processor to release the segment with givensegmentId. This processor will not try to claim the given segment for the specifiedreleaseDurationin the givenunit, to ensure it is not immediately reclaimed. Note that this will override any previous release duration that existed for this segment. Providing a negative value will allow the segment to be immediately claimed.If the processor is not actively processing the segment with given
segmentId, claiming it will be ignored for the given timeframe nonetheless.- Specified by:
releaseSegmentin interfaceStreamingEventProcessor- Parameters:
segmentId- the id of the segment to be released for the specifiedreleaseDurationreleaseDuration- the amount of time to disregardsegmentIdfor processingunit- the unit of time used to express thereleaseDuration
-
claimSegment
Description copied from interface:StreamingEventProcessorInstructs the processor to claim the segment with givensegmentIdand start processing it as soon as possible.The given
segmentIdmust not be currently processed by a different processor instance, as that will have an active claim on the token. Claiming a segment that is already being processed will have no effect and returntrue.A
truereturn value indicates that the segment has been claimed and will be processed by this processor. TheStreamingEventProcessormay postpone start of work until after completion of this task, as long as the token has been claimed so work can be started. A return value offalseindicates that the segment has not been claimed due to the token for that segment not being available.- Specified by:
claimSegmentin interfaceStreamingEventProcessor- Parameters:
segmentId- the identifier of the segment to claim and start processing- Returns:
- a
CompletableFutureproviding the result of the claim operation
-
splitSegment
Description copied from interface:StreamingEventProcessorInstruct the processor to split the segment with givensegmentIdinto two segments, allowing an additional process to start processing events from it.To be able to split segments, the
TokenStoreconfigured with this processor must use explicitly initialized tokens. Also, the givensegmentIdmust be currently processed by a process owned by this processor instance.- Specified by:
splitSegmentin interfaceStreamingEventProcessor- Parameters:
segmentId- the identifier of the segment to split- Returns:
- a
CompletableFutureproviding the result of the split operation
-
mergeSegment
Description copied from interface:StreamingEventProcessorInstruct the processor to merge the segment with givensegmentIdback with the segment that it was originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore, this other segment must not have any active claims in theTokenStore.The processor must currently be actively processing the segment with given
segmentId.Use
StreamingEventProcessor.releaseSegment(int)to force this processor to release any claims with tokens required to merge the segments.To find out which segment a given
segmentIdshould be merged with, use the following procedure:EventTrackerStatus status = processor.processingStatus().get(segmentId); if (status == null) { // this processor is not processing segmentId, and will not be able to merge } return status.getSegment().mergeableSegmentId();- Specified by:
mergeSegmentin interfaceStreamingEventProcessor- Parameters:
segmentId- the identifier of the segment to merge- Returns:
- a
CompletableFutureindicating whether the merge was executed successfully
-
supportsReset
public boolean supportsReset()Description copied from interface:StreamingEventProcessorIndicates whether thisStreamingEventProcessorsupports a "reset". Generally, a reset is supported if at least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly prevent the resets.This method should be invoked prior to invoking any of the
StreamingEventProcessor.resetTokens()operations as an early validation.- Specified by:
supportsResetin interfaceStreamingEventProcessor- Returns:
trueif resets are supported,falseotherwise
-
resetTokens
Description copied from interface:StreamingEventProcessorResets tokens to their initial state. This effectively causes a replay.Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor
-
resetTokens
Description copied from interface:StreamingEventProcessorResets tokens to their initial state. This effectively causes a replay. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor- Type Parameters:
R- the type of the providedresetContext- Parameters:
resetContext- aRused to support the reset operation
-
resetTokens
public CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier) Description copied from interface:StreamingEventProcessorReset tokens to the position as return by the giveninitialTrackingTokenSupplier. This effectively causes a replay since that position.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor- Parameters:
initialTrackingTokenSupplier- a function returning the token representing the position to reset to
-
resetTokens
public <R> CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, R resetContext) Description copied from interface:StreamingEventProcessorReset tokens to the position as return by the giveninitialTrackingTokenSupplier. This effectively causes a replay since that position. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor- Type Parameters:
R- the type of the providedresetContext- Parameters:
initialTrackingTokenSupplier- a function returning the token representing the position to reset toresetContext- aRused to support the reset operation
-
resetTokens
Description copied from interface:StreamingEventProcessorResets tokens to the givenstartPosition. This effectively causes a replay of events since that position.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor- Parameters:
startPosition- the token representing the position to reset the processor to
-
resetTokens
public <R> CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition, R resetContext) Description copied from interface:StreamingEventProcessorResets tokens to the givenstartPosition. This effectively causes a replay of events since that position. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Specified by:
resetTokensin interfaceStreamingEventProcessor- Type Parameters:
R- the type of the providedresetContext- Parameters:
startPosition- the token representing the position to reset the processor toresetContext- aRused to support the reset operation
-
maxCapacity
public int maxCapacity()Specifies the maximum amount of segments thisEventProcessorcan process at the same time.The maximum capacity of the
PooledStreamingEventProcessordefaults to 32767. If required, this value can be adjusted through thePooledStreamingEventProcessorConfiguration.maxClaimedSegments(int)method.- Specified by:
maxCapacityin interfaceStreamingEventProcessor- Returns:
- the maximum amount of segments this
EventProcessorcan process at the same time
-
processingStatus
Description copied from interface:StreamingEventProcessorReturns the status for each of the segments processed by this processor asEventTrackerStatusinstances. The key of theMaprepresent the segment ids processed by this instance. The values of the returnedMaprepresent the last known status of that segment.Note that the returned
Mapis unmodifiable, but does reflect any changes made to the status as the processor is processing Events.- Specified by:
processingStatusin interfaceStreamingEventProcessor- Returns:
- the status for each of the segments processed by the current processor
-
describeTo
Description copied from interface:DescribableComponentDescribe the properties ofthis DescribableComponentwith the givendescriptor.Components should call the appropriate
describePropertymethods on the descriptor to register their properties. The descriptor is responsible for determining how these properties are formatted and structured in the final output.Best Practices: As a general rule, all relevant fields of a
DescribableComponentimplementation should be described in this method. However, developers have discretion to include only the fields that make sense in the context. Not every field may be meaningful for description purposes, especially internal implementation details. Furthermore, components might want to expose different information based on their current state. The final decision on what properties to include lies with the person implementing thedescribeTomethod, who should focus on providing information that is useful for understanding the component's configuration and state.Example implementation:
public void describeTo(ComponentDescriptor descriptor) { descriptor.describeProperty("name", this.name); descriptor.describeProperty("enabled", this.enabled); descriptor.describeProperty("configuration", this.configuration); // A nested component descriptor.describeProperty("handlers", this.eventHandlers); // A collection }- Specified by:
describeToin interfaceDescribableComponent- Parameters:
descriptor- The component descriptor to describethis DescribableComponentn its properties in.
-