public interface StreamingEventProcessor extends EventProcessor
EventProcessor which processes an event stream in segments. Segmenting the stream of events allows for
parallelization of event processing, effectively enhancing the processing speed.
A StreamingEventProcessor uses a TokenStore to store the progress of each of the segments it is
processing. Furthermore, it allows for segment interactions like:
releaseSegment(int) - release a segment processed by this processorsplitSegment(int) - increase the number of segments by splitting one into twomergeSegment(int) - decrease the number of segments by merging two segments into oneresetTokens() - adjust the positions of all segments for this processor to the beginning of the event streamprocessingStatus() - return the EventTrackerStatus of every segment processed by this instance| Modifier and Type | Method and Description |
|---|---|
default CompletableFuture<Boolean> |
claimSegment(int segmentId)
Instructs the processor to claim the segment with given
segmentId and start processing it as soon as
possible. |
String |
getTokenStoreIdentifier()
Returns the unique identifier of the
TokenStore used by this StreamingEventProcessor. |
default boolean |
isReplaying()
Returns the overall replay status of this
StreamingEventProcessor. |
int |
maxCapacity()
Specifies the maximum amount of segments this
EventProcessor can process at the same time. |
CompletableFuture<Boolean> |
mergeSegment(int segmentId)
Instruct the processor to merge the segment with given
segmentId back with the segment that it was
originally split from. |
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the segments processed by this processor as
EventTrackerStatus instances. |
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId. |
void |
releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId. |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier. |
<R> void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier,
R resetContext)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier. |
<R> void |
resetTokens(R resetContext)
Resets tokens to their initial state.
|
default void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition. |
<R> void |
resetTokens(TrackingToken startPosition,
R resetContext)
Resets tokens to the given
startPosition. |
CompletableFuture<Boolean> |
splitSegment(int segmentId)
Instruct the processor to split the segment with given
segmentId into two segments, allowing an
additional process to start processing events from it. |
boolean |
supportsReset()
Indicates whether this
StreamingEventProcessor supports a "reset". |
getHandlerInterceptors, getName, isError, isRunning, shutDown, shutdownAsync, startregisterHandlerInterceptorString getTokenStoreIdentifier()
TokenStore used by this StreamingEventProcessor.TokenStore used by this StreamingEventProcessorUnableToRetrieveIdentifierException - if the TokenStore
was unable to retrieve itvoid releaseSegment(int segmentId)
segmentId.segmentId - the id of the segment to releasevoid releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
segmentId. This processor will not try to claim
the given segment for the specified releaseDuration in the given unit, to ensure it is not
immediately reclaimed. Note that this will override any previous release duration that existed for this segment.
Providing a negative value will allow the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId, claiming it will be ignored
for the given timeframe nonetheless.
segmentId - the id of the segment to be released for the specified releaseDurationreleaseDuration - the amount of time to disregard segmentId for processingunit - the unit of time used to express the releaseDurationdefault CompletableFuture<Boolean> claimSegment(int segmentId)
segmentId and start processing it as soon as
possible.
The given segmentId must not be currently processed by a different processor instance, as that will
have an active claim on the token. Claiming a segment that is already being processed will have no effect
and return true.
A true return value indicates that the segment has been claimed and will be processed by this processor.
The StreamingEventProcessor may postpone start of work until after completion of this task, as long as
the token has been claimed so work can be started. A return value of false indicates that the segment
has not been claimed due to the token for that segment not being available.
segmentId - the identifier of the segment to claim and start processingCompletableFuture providing the result of the claim operationCompletableFuture<Boolean> splitSegment(int segmentId)
segmentId into two segments, allowing an
additional process to start processing events from it.
To be able to split segments, the TokenStore configured with this processor must use explicitly
initialized tokens. See TokenStore.requiresExplicitSegmentInitialization(). Also, the given segmentId must be currently processed by a process owned by this processor instance.
segmentId - the identifier of the segment to splitCompletableFuture providing the result of the split operationCompletableFuture<Boolean> mergeSegment(int segmentId)
segmentId back with the segment that it was
originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore,
this other segment must not have any active claims in the TokenStore.
The processor must currently be actively processing the segment with given segmentId.
Use releaseSegment(int) to force this processor to release any claims with tokens required to merge the
segments.
To find out which segment a given segmentId should be merged with, use the following procedure:
EventTrackerStatus status = processor.processingStatus().get(segmentId);
if (status == null) {
// this processor is not processing segmentId, and will not be able to merge
}
return status.getSegment().mergeableSegmentId();
segmentId - the identifier of the segment to mergeCompletableFuture indicating whether the merge was executed successfullyboolean supportsReset()
StreamingEventProcessor supports a "reset". Generally, a reset is supported if at
least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly
prevent the resets.
This method should be invoked prior to invoking any of the resetTokens() operations as an early
validation.
true if resets are supported, false otherwisevoid resetTokens()
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
<R> void resetTokens(@Nullable R resetContext)
resetContext will be
used to support the (optional) reset operation in an Event Handling Component.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R - the type of the provided resetContextresetContext - a R used to support the reset operationvoid resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
initialTrackingTokenSupplier. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
initialTrackingTokenSupplier - a function returning the token representing the position to reset to<R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier, @Nullable R resetContext)
initialTrackingTokenSupplier. This effectively causes
a replay since that position. The given resetContext will be used to support the (optional) reset
operation in an Event Handling Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R - the type of the provided resetContextinitialTrackingTokenSupplier - a function returning the token representing the position to reset toresetContext - a R used to support the reset operationdefault void resetTokens(@Nonnull TrackingToken startPosition)
startPosition. This effectively causes a replay of events since that
position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
startPosition - the token representing the position to reset the processor to<R> void resetTokens(@Nonnull TrackingToken startPosition, @Nullable R resetContext)
startPosition. This effectively causes a replay of events since that position.
The given resetContext will be used to support the (optional) reset operation in an Event Handling
Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R - the type of the provided resetContextstartPosition - the token representing the position to reset the processor toresetContext - a R used to support the reset operationint maxCapacity()
EventProcessor can process at the same time.EventProcessor can process at the same timeMap<Integer,EventTrackerStatus> processingStatus()
EventTrackerStatus instances.
The key of the Map represent the segment ids processed by this instance. The values of the returned
Map represent the last known status of that segment.
Note that the returned Map is unmodifiable, but does reflect any changes made to the status as the
processor is processing Events.
default boolean isReplaying()
StreamingEventProcessor. Any other instances of this
streaming processor running on other applications are not not taken into account in this calculation.
Note that when an EventTrackerStatus returns true for both EventTrackerStatus.isReplaying() and EventTrackerStatus.isCaughtUp(), that the replay is done but the
processor did not handle any new events yet.
true if any of the segments is still replaying and not caught up yet, false otherwiseCopyright © 2010–2025. All rights reserved.