public class TrackingEventProcessor extends AbstractEventProcessor implements StreamingEventProcessor, Lifecycle
StreamableMessageSource.
A supplied TokenStore allows the EventProcessor to keep track of its position in the event log. After
processing an event batch the EventProcessor updates its tracking token in the TokenStore.
A TrackingEventProcessor is able to continue processing from the last stored token when it is restarted. It is also
capable of replaying events from any starting token. To replay the entire event log, simply invoke resetTokens() on this processor to adjust the positions of the TrackingToken(s) within the TokenStore. To replay from a specific point, resetTokens(Function) can be utilized to define the new point
to start at.
Note, the AbstractEventProcessor.getName() of this StreamingEventProcessor is used to obtain the tracking token from the
TokenStore, so take care when renaming a TrackingEventProcessor.
| Modifier and Type | Class and Description |
|---|---|
static class |
TrackingEventProcessor.Builder
Builder class to instantiate a
TrackingEventProcessor. |
protected static class |
TrackingEventProcessor.State
Enum representing the possible states of the Processor
|
Lifecycle.LifecycleHandler, Lifecycle.LifecycleRegistryspanFactory| Modifier | Constructor and Description |
|---|---|
protected |
TrackingEventProcessor(TrackingEventProcessor.Builder builder)
Instantiate a
TrackingEventProcessor based on the fields contained in the TrackingEventProcessor.Builder. |
| Modifier and Type | Method and Description |
|---|---|
int |
activeProcessorThreads()
Returns an approximation of the number of threads currently processing events.
|
int |
availableProcessorThreads()
Returns the number of threads this processor has available to assign segments.
|
static TrackingEventProcessor.Builder |
builder()
Instantiate a Builder to be able to create a
TrackingEventProcessor. |
protected boolean |
canHandle(EventMessage<?> eventMessage,
Collection<Segment> segments)
Indicates whether any of the components handling events for this Processor are able to handle the given
eventMessage for any of the given segments. |
protected void |
doSleepFor(long millisToSleep)
Instructs the current Thread to sleep until the given deadline.
|
StreamableMessageSource<? extends TrackedEventMessage<?>> |
getMessageSource()
Returns the
StreamableMessageSource this processor is using |
protected TrackingEventProcessor.State |
getState()
Get the state of the event processor.
|
String |
getTokenStoreIdentifier()
Returns the unique identifier of the
TokenStore used by this StreamingEventProcessor. |
boolean |
isError()
Indicates whether the processor has been shut down due to an error.
|
boolean |
isRunning()
Indicates whether this processor is currently running (i.e.
|
int |
maxCapacity()
Specifies the maximum amount of segments this
EventProcessor can process at the same time. |
CompletableFuture<Boolean> |
mergeSegment(int segmentId)
Instruct the processor to merge the segment with given
segmentId back with the segment that it was
originally split from. |
protected void |
processingLoop(Segment segment)
Fetch and process event batches continuously for as long as the processor is not shutting down.
|
protected Set<Segment> |
processingSegments(TrackingToken token,
Segment segment)
Indicates whether the
eventMessage identified with given token should be processed as part of the
given segment. |
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the segments processed by this processor as
EventTrackerStatus instances. |
void |
registerLifecycleHandlers(Lifecycle.LifecycleRegistry handle)
Registers the activities to be executed in the various phases of an application's lifecycle.
|
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId. |
void |
releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId. |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier. |
<R> void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier,
R resetContext)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier. |
<R> void |
resetTokens(R resetContext)
Resets tokens to their initial state.
|
void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition. |
<R> void |
resetTokens(TrackingToken startPosition,
R resetContext)
Resets tokens to the given
startPosition. |
void |
shutDown()
Shuts down the processor.
|
CompletableFuture<Void> |
shutdownAsync()
Initiates a shutdown, providing a
CompletableFuture that completes when the shutdown process is
finished. |
CompletableFuture<Boolean> |
splitSegment(int segmentId)
Instruct the processor to split the segment with given
segmentId into two segments, allowing an
additional process to start processing events from it. |
void |
start()
Start this processor.
|
protected void |
startSegmentWorkers()
Starts workers for a number of segments.
|
boolean |
supportsReset()
Indicates whether this
StreamingEventProcessor supports a "reset". |
canHandle, canHandleType, eventHandlerInvoker, getHandlerInterceptors, getName, processInUnitOfWork, processInUnitOfWork, registerHandlerInterceptor, reportIgnored, toStringclone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitisReplayinggetHandlerInterceptors, getNameregisterHandlerInterceptorprotected TrackingEventProcessor(TrackingEventProcessor.Builder builder)
TrackingEventProcessor based on the fields contained in the TrackingEventProcessor.Builder.
Will assert that the Event Processor name, EventHandlerInvoker, StreamableMessageSource,
TokenStore and TransactionManager are not null, and will throw an AxonConfigurationException if any of them is null.
builder - the TrackingEventProcessor.Builder used to instantiate a TrackingEventProcessor instancepublic static TrackingEventProcessor.Builder builder()
TrackingEventProcessor.
The RollbackConfigurationType defaults to a RollbackConfigurationType.ANY_THROWABLE, the
ErrorHandler is defaulted to a PropagatingErrorHandler, the MessageMonitor defaults to a
NoOpMessageMonitor, the TrackingEventProcessorConfiguration to a
TrackingEventProcessorConfiguration.forSingleThreadedProcessing() call, and the SpanFactory to a
NoOpSpanFactory. The Event Processor name, EventHandlerInvoker,
StreamableMessageSource, TokenStore and TransactionManager are
hard requirements and as such should be provided.
TrackingEventProcessorpublic void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry handle)
LifecycleregisterLifecycleHandlers in interface Lifecyclehandle - the lifecycle instance to register the handlers withLifecycle.LifecycleRegistry.onShutdown(int, Runnable),
LifecycleRegistry#onShutdown(int, LifecycleHandler),
Lifecycle.LifecycleRegistry.onStart(int, Runnable),
LifecycleRegistry#onStart(int, LifecycleHandler)public void start()
StreamableMessageSource.openStream(TrackingToken). The TrackingToken used to open the stream will be
fetched from the TokenStore.
Upon start up of an application, this method will be invoked in the Phase.INBOUND_EVENT_CONNECTORS
phase.
start in interface EventProcessorpublic CompletableFuture<Boolean> splitSegment(int segmentId)
StreamingEventProcessorsegmentId into two segments, allowing an
additional process to start processing events from it.
To be able to split segments, the TokenStore configured with this processor must use explicitly
initialized tokens. See TokenStore.requiresExplicitSegmentInitialization(). Also, the given segmentId must be currently processed by a process owned by this processor instance.
splitSegment in interface StreamingEventProcessorsegmentId - the identifier of the segment to splitCompletableFuture providing the result of the split operationpublic String getTokenStoreIdentifier()
StreamingEventProcessorTokenStore used by this StreamingEventProcessor.getTokenStoreIdentifier in interface StreamingEventProcessorTokenStore used by this StreamingEventProcessorpublic CompletableFuture<Boolean> mergeSegment(int segmentId)
StreamingEventProcessorsegmentId back with the segment that it was
originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore,
this other segment must not have any active claims in the TokenStore.
The processor must currently be actively processing the segment with given segmentId.
Use StreamingEventProcessor.releaseSegment(int) to force this processor to release any claims with tokens required to merge the
segments.
To find out which segment a given segmentId should be merged with, use the following procedure:
EventTrackerStatus status = processor.processingStatus().get(segmentId);
if (status == null) {
// this processor is not processing segmentId, and will not be able to merge
}
return status.getSegment().mergeableSegmentId();
mergeSegment in interface StreamingEventProcessorsegmentId - the identifier of the segment to mergeCompletableFuture indicating whether the merge was executed successfullyprotected void processingLoop(Segment segment)
Events with the same tracking token (which is possible as result of upcasting) should always be processed in the same batch. In those cases the batch size may be larger than the one configured.
segment - The Segment of the Stream that should be processed.protected Set<Segment> processingSegments(TrackingToken token, Segment segment)
eventMessage identified with given token should be processed as part of the
given segment. This implementation is away of merge tokens and will recursively detect the (sub)segment
in which an event should be handled.token - The token to check segment validity forsegment - The segment to process the event intrue if this event should be handled, otherwise falseprotected boolean canHandle(EventMessage<?> eventMessage, Collection<Segment> segments) throws Exception
eventMessage for any of the given segments.eventMessage - The message to handlesegments - The segments to handle the message inException - when an exception occurs evaluating the messagepublic void releaseSegment(int segmentId)
segmentId.
This will also ignore the specified this segment for "re-claiming" for twice the TrackingEventProcessorConfiguration.getTokenClaimInterval() token claim interval.
releaseSegment in interface StreamingEventProcessorsegmentId - the id of the segment to releasepublic void releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
StreamingEventProcessorsegmentId. This processor will not try to claim
the given segment for the specified releaseDuration in the given unit, to ensure it is not
immediately reclaimed. Note that this will override any previous release duration that existed for this segment.
Providing a negative value will allow the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId, claiming it will be ignored
for the given timeframe nonetheless.
releaseSegment in interface StreamingEventProcessorsegmentId - the id of the segment to be released for the specified releaseDurationreleaseDuration - the amount of time to disregard segmentId for processingunit - the unit of time used to express the releaseDurationpublic void resetTokens()
StreamingEventProcessorBefore attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorpublic <R> void resetTokens(R resetContext)
StreamingEventProcessorresetContext will be
used to support the (optional) reset operation in an Event Handling Component.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorR - the type of the provided resetContextresetContext - a R used to support the reset operationpublic void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
StreamingEventProcessorinitialTrackingTokenSupplier. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorinitialTrackingTokenSupplier - a function returning the token representing the position to reset topublic <R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier, R resetContext)
StreamingEventProcessorinitialTrackingTokenSupplier. This effectively causes
a replay since that position. The given resetContext will be used to support the (optional) reset
operation in an Event Handling Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorR - the type of the provided resetContextinitialTrackingTokenSupplier - a function returning the token representing the position to reset toresetContext - a R used to support the reset operationpublic void resetTokens(@Nonnull TrackingToken startPosition)
StreamingEventProcessorstartPosition. This effectively causes a replay of events since that
position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorstartPosition - the token representing the position to reset the processor topublic <R> void resetTokens(@Nonnull TrackingToken startPosition, R resetContext)
StreamingEventProcessorstartPosition. This effectively causes a replay of events since that position.
The given resetContext will be used to support the (optional) reset operation in an Event Handling
Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens in interface StreamingEventProcessorR - the type of the provided resetContextstartPosition - the token representing the position to reset the processor toresetContext - a R used to support the reset operationpublic boolean supportsReset()
StreamingEventProcessorStreamingEventProcessor supports a "reset". Generally, a reset is supported if at
least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly
prevent the resets.
This method should be invoked prior to invoking any of the StreamingEventProcessor.resetTokens() operations as an early
validation.
supportsReset in interface StreamingEventProcessortrue if resets are supported, false otherwisepublic boolean isRunning()
EventProcessorisRunning in interface EventProcessortrue when running, otherwise falsepublic boolean isError()
EventProcessor
Note that this method returns false when the processor was stopped using EventProcessor.shutDown().
isError in interface EventProcessortrue when paused due to an error, otherwise falsepublic StreamableMessageSource<? extends TrackedEventMessage<?>> getMessageSource()
StreamableMessageSource this processor is usingStreamableMessageSourcepublic void shutDown()
shutDown in interface EventProcessorpublic CompletableFuture<Void> shutdownAsync()
CompletableFuture that completes when the shutdown process is
finished.
Will be shutdown on the Phase.INBOUND_EVENT_CONNECTORS phase.
shutdownAsync in interface EventProcessorpublic int availableProcessorThreads()
public int maxCapacity()
StreamingEventProcessorEventProcessor can process at the same time.maxCapacity in interface StreamingEventProcessorEventProcessor can process at the same timepublic int activeProcessorThreads()
public Map<Integer,EventTrackerStatus> processingStatus()
StreamingEventProcessorEventTrackerStatus instances.
The key of the Map represent the segment ids processed by this instance. The values of the returned
Map represent the last known status of that segment.
Note that the returned Map is unmodifiable, but does reflect any changes made to the status as the
processor is processing Events.
processingStatus in interface StreamingEventProcessorprotected TrackingEventProcessor.State getState()
protected void startSegmentWorkers()
root segment exists in the TokenStore, it will be split in multiple segments as configured
by the TrackingEventProcessorConfiguration.andInitialSegmentsCount(int), otherwise the existing segments
in the TokenStore will be used.
An attempt will be made to instantiate a worker for each segment. This will succeed when the number of threads
matches the requested segments. The number of active threads can be configured with TrackingEventProcessorConfiguration.forParallelProcessing(int). When insufficient threads are available to serve
the number of segments, it will result in some segments not being processed.protected void doSleepFor(long millisToSleep)
The default implementation will sleep in blocks of 100ms, intermittently checking for the processor's state. Once the processor stops running, this method will return immediately (after detecting the state change).
millisToSleep - The number of milliseconds to sleepCopyright © 2010–2023. All rights reserved.