Interface StreamingEventProcessor
- All Superinterfaces:
DescribableComponent,EventProcessor
- All Known Implementing Classes:
PooledStreamingEventProcessor
EventProcessor which processes an event stream in segments. Segmenting the stream of events allows for
parallelization of event processing, effectively enhancing the processing speed.
A StreamingEventProcessor uses a TokenStore to store the progress of each of the segments it is
processing. Furthermore, it allows for segment interactions like:
releaseSegment(int)- release a segment processed by this processorsplitSegment(int)- increase the number of segments by splitting one into twomergeSegment(int)- decrease the number of segments by merging two segments into oneresetTokens()- adjust the positions of all segments for this processor to the beginning of the event streamprocessingStatus()- return theEventTrackerStatusof every segment processed by this instance
- Since:
- 4.5
- Author:
- Allard Buijze, Steven van Beelen
-
Method Summary
Modifier and TypeMethodDescriptiondefault CompletableFuture<Boolean> claimSegment(int segmentId) Instructs the processor to claim the segment with givensegmentIdand start processing it as soon as possible.Returns the unique identifier of theTokenStoreused by thisStreamingEventProcessor.default booleanReturns the overall replay status of thisStreamingEventProcessor.intSpecifies the maximum amount of segments thisEventProcessorcan process at the same time.mergeSegment(int segmentId) Instruct the processor to merge the segment with givensegmentIdback with the segment that it was originally split from.Returns the status for each of the segments processed by this processor asEventTrackerStatusinstances.releaseSegment(int segmentId) Instructs the processor to release the segment with givensegmentId.releaseSegment(int segmentId, long releaseDuration, TimeUnit unit) Instructs the processor to release the segment with givensegmentId.Resets tokens to their initial state.resetTokens(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier.<R> CompletableFuture<Void> resetTokens(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, R resetContext) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier.default CompletableFuture<Void> resetTokens(TrackingToken startPosition) Resets tokens to the givenstartPosition.<R> CompletableFuture<Void> resetTokens(TrackingToken startPosition, R resetContext) Resets tokens to the givenstartPosition.<R> CompletableFuture<Void> resetTokens(R resetContext) Resets tokens to their initial state.splitSegment(int segmentId) Instruct the processor to split the segment with givensegmentIdinto two segments, allowing an additional process to start processing events from it.booleanIndicates whether thisStreamingEventProcessorsupports a "reset".Methods inherited from interface org.axonframework.common.infra.DescribableComponent
describeTo
-
Method Details
-
getTokenStoreIdentifier
String getTokenStoreIdentifier()Returns the unique identifier of theTokenStoreused by thisStreamingEventProcessor.- Returns:
- the unique identifier of the
TokenStoreused by thisStreamingEventProcessor - Throws:
UnableToRetrieveIdentifierException- if theTokenStorewas unable to retrieve it
-
releaseSegment
Instructs the processor to release the segment with givensegmentId.- Parameters:
segmentId- the id of the segment to release
-
releaseSegment
Instructs the processor to release the segment with givensegmentId. This processor will not try to claim the given segment for the specifiedreleaseDurationin the givenunit, to ensure it is not immediately reclaimed. Note that this will override any previous release duration that existed for this segment. Providing a negative value will allow the segment to be immediately claimed.If the processor is not actively processing the segment with given
segmentId, claiming it will be ignored for the given timeframe nonetheless.- Parameters:
segmentId- the id of the segment to be released for the specifiedreleaseDurationreleaseDuration- the amount of time to disregardsegmentIdfor processingunit- the unit of time used to express thereleaseDuration
-
claimSegment
Instructs the processor to claim the segment with givensegmentIdand start processing it as soon as possible.The given
segmentIdmust not be currently processed by a different processor instance, as that will have an active claim on the token. Claiming a segment that is already being processed will have no effect and returntrue.A
truereturn value indicates that the segment has been claimed and will be processed by this processor. TheStreamingEventProcessormay postpone start of work until after completion of this task, as long as the token has been claimed so work can be started. A return value offalseindicates that the segment has not been claimed due to the token for that segment not being available.- Parameters:
segmentId- the identifier of the segment to claim and start processing- Returns:
- a
CompletableFutureproviding the result of the claim operation
-
splitSegment
Instruct the processor to split the segment with givensegmentIdinto two segments, allowing an additional process to start processing events from it.To be able to split segments, the
TokenStoreconfigured with this processor must use explicitly initialized tokens. Also, the givensegmentIdmust be currently processed by a process owned by this processor instance.- Parameters:
segmentId- the identifier of the segment to split- Returns:
- a
CompletableFutureproviding the result of the split operation
-
mergeSegment
Instruct the processor to merge the segment with givensegmentIdback with the segment that it was originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore, this other segment must not have any active claims in theTokenStore.The processor must currently be actively processing the segment with given
segmentId.Use
releaseSegment(int)to force this processor to release any claims with tokens required to merge the segments.To find out which segment a given
segmentIdshould be merged with, use the following procedure:EventTrackerStatus status = processor.processingStatus().get(segmentId); if (status == null) { // this processor is not processing segmentId, and will not be able to merge } return status.getSegment().mergeableSegmentId();- Parameters:
segmentId- the identifier of the segment to merge- Returns:
- a
CompletableFutureindicating whether the merge was executed successfully
-
supportsReset
boolean supportsReset()Indicates whether thisStreamingEventProcessorsupports a "reset". Generally, a reset is supported if at least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly prevent the resets.This method should be invoked prior to invoking any of the
resetTokens()operations as an early validation.- Returns:
trueif resets are supported,falseotherwise
-
resetTokens
CompletableFuture<Void> resetTokens()Resets tokens to their initial state. This effectively causes a replay.Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
-
resetTokens
Resets tokens to their initial state. This effectively causes a replay. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Type Parameters:
R- the type of the providedresetContext- Parameters:
resetContext- aRused to support the reset operation
-
resetTokens
CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier. This effectively causes a replay since that position.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Parameters:
initialTrackingTokenSupplier- a function returning the token representing the position to reset to
-
resetTokens
<R> CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, @Nullable R resetContext) Reset tokens to the position as return by the giveninitialTrackingTokenSupplier. This effectively causes a replay since that position. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Type Parameters:
R- the type of the providedresetContext- Parameters:
initialTrackingTokenSupplier- a function returning the token representing the position to reset toresetContext- aRused to support the reset operation
-
resetTokens
Resets tokens to the givenstartPosition. This effectively causes a replay of events since that position.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Parameters:
startPosition- the token representing the position to reset the processor to
-
resetTokens
<R> CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition, @Nullable R resetContext) Resets tokens to the givenstartPosition. This effectively causes a replay of events since that position. The givenresetContextwill be used to support the (optional) reset operation in an Event Handling Component.Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
- Type Parameters:
R- the type of the providedresetContext- Parameters:
startPosition- the token representing the position to reset the processor toresetContext- aRused to support the reset operation
-
maxCapacity
int maxCapacity()Specifies the maximum amount of segments thisEventProcessorcan process at the same time.- Returns:
- the maximum amount of segments this
EventProcessorcan process at the same time
-
processingStatus
Map<Integer,EventTrackerStatus> processingStatus()Returns the status for each of the segments processed by this processor asEventTrackerStatusinstances. The key of theMaprepresent the segment ids processed by this instance. The values of the returnedMaprepresent the last known status of that segment.Note that the returned
Mapis unmodifiable, but does reflect any changes made to the status as the processor is processing Events.- Returns:
- the status for each of the segments processed by the current processor
-
isReplaying
default boolean isReplaying()Returns the overall replay status of thisStreamingEventProcessor. Any other instances of this streaming processor running on other applications are not not taken into account in this calculation.Note that when an
EventTrackerStatusreturnstruefor bothEventTrackerStatus.isReplaying()andEventTrackerStatus.isCaughtUp(), that the replay is done but the processor did not handle any new events yet.- Returns:
trueif any of the segments is still replaying and not caught up yet,falseotherwise
-