public interface StreamingEventProcessor extends EventProcessor
EventProcessor
which processes an event stream in segments. Segmenting the stream of events allows for
parallelization of event processing, effectively enhancing the processing speed.
A StreamingEventProcessor
uses a TokenStore
to store the progress of each of the segments it is
processing. Furthermore, it allows for segment interactions like:
releaseSegment(int)
- release a segment processed by this processorsplitSegment(int)
- increase the number of segments by splitting one into twomergeSegment(int)
- decrease the number of segments by merging two segments into oneresetTokens()
- adjust the positions of all segments for this processor to the beginning of the event streamprocessingStatus()
- return the EventTrackerStatus
of every segment processed by this instanceModifier and Type | Method and Description |
---|---|
default CompletableFuture<Boolean> |
claimSegment(int segmentId)
Instructs the processor to claim the segment with given
segmentId and start processing it as soon as
possible. |
String |
getTokenStoreIdentifier()
Returns the unique identifier of the
TokenStore used by this StreamingEventProcessor . |
default boolean |
isReplaying()
Returns the overall replay status of this
StreamingEventProcessor . |
int |
maxCapacity()
Specifies the maximum amount of segments this
EventProcessor can process at the same time. |
CompletableFuture<Boolean> |
mergeSegment(int segmentId)
Instruct the processor to merge the segment with given
segmentId back with the segment that it was
originally split from. |
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the segments processed by this processor as
EventTrackerStatus instances. |
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId . |
void |
releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId . |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier,
R resetContext)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(R resetContext)
Resets tokens to their initial state.
|
default void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition . |
<R> void |
resetTokens(TrackingToken startPosition,
R resetContext)
Resets tokens to the given
startPosition . |
CompletableFuture<Boolean> |
splitSegment(int segmentId)
Instruct the processor to split the segment with given
segmentId into two segments, allowing an
additional process to start processing events from it. |
boolean |
supportsReset()
Indicates whether this
StreamingEventProcessor supports a "reset". |
getHandlerInterceptors, getName, isError, isRunning, shutDown, shutdownAsync, start
registerHandlerInterceptor
String getTokenStoreIdentifier()
TokenStore
used by this StreamingEventProcessor
.TokenStore
used by this StreamingEventProcessor
UnableToRetrieveIdentifierException
- if the TokenStore
was unable to retrieve itvoid releaseSegment(int segmentId)
segmentId
.segmentId
- the id of the segment to releasevoid releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)
segmentId
. This processor will not try to claim
the given segment for the specified releaseDuration
in the given unit
, to ensure it is not
immediately reclaimed. Note that this will override any previous release duration that existed for this segment.
Providing a negative value will allow the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId
, claiming it will be ignored
for the given timeframe nonetheless.
segmentId
- the id of the segment to be released for the specified releaseDuration
releaseDuration
- the amount of time to disregard segmentId
for processingunit
- the unit of time used to express the releaseDuration
default CompletableFuture<Boolean> claimSegment(int segmentId)
segmentId
and start processing it as soon as
possible.
The given segmentId
must not be currently processed by a different processor instance, as that will
have an active claim on the token. Claiming a segment that is already being processed will have no effect
and return true
.
A true
return value indicates that the segment has been claimed and will be processed by this processor.
The StreamingEventProcessor
may postpone start of work until after completion of this task, as long as
the token has been claimed so work can be started. A return value of false
indicates that the segment
has not been claimed due to the token for that segment not being available.
segmentId
- the identifier of the segment to claim and start processingCompletableFuture
providing the result of the claim operationCompletableFuture<Boolean> splitSegment(int segmentId)
segmentId
into two segments, allowing an
additional process to start processing events from it.
To be able to split segments, the TokenStore
configured with this processor must use explicitly
initialized tokens. See TokenStore.requiresExplicitSegmentInitialization()
. Also, the given segmentId
must be currently processed by a process owned by this processor instance.
segmentId
- the identifier of the segment to splitCompletableFuture
providing the result of the split operationCompletableFuture<Boolean> mergeSegment(int segmentId)
segmentId
back with the segment that it was
originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore,
this other segment must not have any active claims in the TokenStore
.
The processor must currently be actively processing the segment with given segmentId
.
Use releaseSegment(int)
to force this processor to release any claims with tokens required to merge the
segments.
To find out which segment a given segmentId
should be merged with, use the following procedure:
EventTrackerStatus status = processor.processingStatus().get(segmentId); if (status == null) { // this processor is not processing segmentId, and will not be able to merge } return status.getSegment().mergeableSegmentId();
segmentId
- the identifier of the segment to mergeCompletableFuture
indicating whether the merge was executed successfullyboolean supportsReset()
StreamingEventProcessor
supports a "reset". Generally, a reset is supported if at
least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly
prevent the resets.
This method should be invoked prior to invoking any of the resetTokens()
operations as an early
validation.
true
if resets are supported, false
otherwisevoid resetTokens()
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
<R> void resetTokens(@Nullable R resetContext)
resetContext
will be
used to support the (optional) reset operation in an Event Handling Component.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R
- the type of the provided resetContext
resetContext
- a R
used to support the reset operationvoid resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
initialTrackingTokenSupplier
. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
initialTrackingTokenSupplier
- a function returning the token representing the position to reset to<R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier, @Nullable R resetContext)
initialTrackingTokenSupplier
. This effectively causes
a replay since that position. The given resetContext
will be used to support the (optional) reset
operation in an Event Handling Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R
- the type of the provided resetContext
initialTrackingTokenSupplier
- a function returning the token representing the position to reset toresetContext
- a R
used to support the reset operationdefault void resetTokens(@Nonnull TrackingToken startPosition)
startPosition
. This effectively causes a replay of events since that
position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
startPosition
- the token representing the position to reset the processor to<R> void resetTokens(@Nonnull TrackingToken startPosition, @Nullable R resetContext)
startPosition
. This effectively causes a replay of events since that position.
The given resetContext
will be used to support the (optional) reset operation in an Event Handling
Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
R
- the type of the provided resetContext
startPosition
- the token representing the position to reset the processor toresetContext
- a R
used to support the reset operationint maxCapacity()
EventProcessor
can process at the same time.EventProcessor
can process at the same timeMap<Integer,EventTrackerStatus> processingStatus()
EventTrackerStatus
instances.
The key of the Map
represent the segment ids processed by this instance. The values of the returned
Map
represent the last known status of that segment.
Note that the returned Map
is unmodifiable, but does reflect any changes made to the status as the
processor is processing Events.
default boolean isReplaying()
StreamingEventProcessor
. Any other instances of this
streaming processor running on other applications are not not taken into account in this calculation.
Note that when an EventTrackerStatus
returns true
for both EventTrackerStatus.isReplaying()
and EventTrackerStatus.isCaughtUp()
, that the replay is done but the
processor did not handle any new events yet.
true
if any of the segments is still replaying and not caught up yet, false
otherwiseCopyright © 2010–2024. All rights reserved.