public class TrackingEventProcessor extends AbstractEventProcessor implements StreamingEventProcessor, Lifecycle
StreamableMessageSource
.
A supplied TokenStore
allows the EventProcessor to keep track of its position in the event log. After
processing an event batch the EventProcessor updates its tracking token in the TokenStore.
A TrackingEventProcessor is able to continue processing from the last stored token when it is restarted. It is also
capable of replaying events from any starting token. To replay the entire event log, simply invoke resetTokens()
on this processor to adjust the positions of the TrackingToken
(s) within the TokenStore
. To replay from a specific point, resetTokens(Function)
can be utilized to define the new point
to start at.
Note, the AbstractEventProcessor.getName()
of this StreamingEventProcessor
is used to obtain the tracking token from the
TokenStore
, so take care when renaming a TrackingEventProcessor
.
Modifier and Type | Class and Description |
---|---|
static class |
TrackingEventProcessor.Builder
Builder class to instantiate a
TrackingEventProcessor . |
protected static class |
TrackingEventProcessor.State
Enum representing the possible states of the Processor
|
Lifecycle.LifecycleHandler, Lifecycle.LifecycleRegistry
spanFactory
Modifier | Constructor and Description |
---|---|
protected |
TrackingEventProcessor(TrackingEventProcessor.Builder builder)
Instantiate a
TrackingEventProcessor based on the fields contained in the TrackingEventProcessor.Builder . |
Modifier and Type | Method and Description |
---|---|
int |
activeProcessorThreads()
Returns an approximation of the number of threads currently processing events.
|
int |
availableProcessorThreads()
Returns the number of threads this processor has available to assign segments.
|
static TrackingEventProcessor.Builder |
builder()
Instantiate a Builder to be able to create a
TrackingEventProcessor . |
protected boolean |
canHandle(EventMessage<?> eventMessage,
Collection<Segment> segments)
Indicates whether any of the components handling events for this Processor are able to handle the given
eventMessage for any of the given segments . |
CompletableFuture<Boolean> |
claimSegment(int segmentId)
Instructs the processor to claim the segment with given
segmentId and start processing it as soon as
possible. |
protected void |
doSleepFor(long millisToSleep)
Instructs the current Thread to sleep until the given deadline.
|
protected void |
doSleepFor(long millisToSleep,
AtomicBoolean interruptFlag)
Instructs the current Thread to sleep until the given deadline.
|
StreamableMessageSource<? extends TrackedEventMessage<?>> |
getMessageSource()
Returns the
StreamableMessageSource this processor is using |
protected TrackingEventProcessor.State |
getState()
Get the state of the event processor.
|
String |
getTokenStoreIdentifier()
Returns the unique identifier of the
TokenStore used by this StreamingEventProcessor . |
boolean |
isError()
Indicates whether the processor has been shut down due to an error.
|
boolean |
isRunning()
Indicates whether this processor is currently running (i.e.
|
int |
maxCapacity()
Specifies the maximum amount of segments this
EventProcessor can process at the same time. |
CompletableFuture<Boolean> |
mergeSegment(int segmentId)
Instruct the processor to merge the segment with given
segmentId back with the segment that it was
originally split from. |
protected void |
processingLoop(Segment segment)
Fetch and process event batches continuously for as long as the processor is not shutting down.
|
protected Set<Segment> |
processingSegments(TrackingToken token,
Segment segment)
Indicates whether the
eventMessage identified with given token should be processed as part of the
given segment . |
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the segments processed by this processor as
EventTrackerStatus instances. |
void |
registerLifecycleHandlers(Lifecycle.LifecycleRegistry handle)
Registers the activities to be executed in the various phases of an application's lifecycle.
|
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId . |
void |
releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId . |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier,
R resetContext)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(R resetContext)
Resets tokens to their initial state.
|
void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition . |
<R> void |
resetTokens(TrackingToken startPosition,
R resetContext)
Resets tokens to the given
startPosition . |
void |
shutDown()
Shuts down the processor.
|
CompletableFuture<Void> |
shutdownAsync()
Initiates a shutdown, providing a
CompletableFuture that completes when the shutdown process is
finished. |
CompletableFuture<Boolean> |
splitSegment(int segmentId)
Instruct the processor to split the segment with given
segmentId into two segments, allowing an
additional process to start processing events from it. |
void |
start()
Start this processor.
|
protected void |
startSegmentWorkers()
Starts workers for a number of segments.
|
boolean |
supportsReset()
Indicates whether this
StreamingEventProcessor supports a "reset". |
canHandle, canHandleType, eventHandlerInvoker, getHandlerInterceptors, getName, processInUnitOfWork, processInUnitOfWork, registerHandlerInterceptor, reportIgnored, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
isReplaying
getHandlerInterceptors, getName
registerHandlerInterceptor
protected TrackingEventProcessor(TrackingEventProcessor.Builder builder)
TrackingEventProcessor
based on the fields contained in the TrackingEventProcessor.Builder
.
Will assert that the Event Processor name
, EventHandlerInvoker
, StreamableMessageSource
,
TokenStore
and TransactionManager
are not null
, and will throw an AxonConfigurationException
if any of them is null
.
builder
- the TrackingEventProcessor.Builder
used to instantiate a TrackingEventProcessor
instancepublic static TrackingEventProcessor.Builder builder()
TrackingEventProcessor
.
The RollbackConfigurationType
defaults to a RollbackConfigurationType.ANY_THROWABLE
, the
ErrorHandler
is defaulted to a PropagatingErrorHandler
, the MessageMonitor
defaults to a
NoOpMessageMonitor
, the TrackingEventProcessorConfiguration
to a
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
call, and the
EventProcessorSpanFactory
to a DefaultEventProcessorSpanFactory
backed by a
NoOpSpanFactory
. The Event Processor name
, EventHandlerInvoker
,
StreamableMessageSource
, TokenStore
and TransactionManager
are
hard requirements and as such should be provided.
TrackingEventProcessor
public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry handle)
Lifecycle
registerLifecycleHandlers
in interface Lifecycle
handle
- the lifecycle instance to register the handlers withLifecycle.LifecycleRegistry.onShutdown(int, Runnable)
,
LifecycleRegistry#onShutdown(int, LifecycleHandler)
,
Lifecycle.LifecycleRegistry.onStart(int, Runnable)
,
LifecycleRegistry#onStart(int, LifecycleHandler)
public void start()
StreamableMessageSource.openStream(TrackingToken)
. The TrackingToken
used to open the stream will be
fetched from the TokenStore
.
Upon start up of an application, this method will be invoked in the Phase.INBOUND_EVENT_CONNECTORS
phase.
start
in interface EventProcessor
public CompletableFuture<Boolean> splitSegment(int segmentId)
StreamingEventProcessor
segmentId
into two segments, allowing an
additional process to start processing events from it.
To be able to split segments, the TokenStore
configured with this processor must use explicitly
initialized tokens. See TokenStore.requiresExplicitSegmentInitialization()
. Also, the given segmentId
must be currently processed by a process owned by this processor instance.
splitSegment
in interface StreamingEventProcessor
segmentId
- the identifier of the segment to splitCompletableFuture
providing the result of the split operationpublic String getTokenStoreIdentifier()
StreamingEventProcessor
TokenStore
used by this StreamingEventProcessor
.getTokenStoreIdentifier
in interface StreamingEventProcessor
TokenStore
used by this StreamingEventProcessor
public CompletableFuture<Boolean> mergeSegment(int segmentId)
StreamingEventProcessor
segmentId
back with the segment that it was
originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore,
this other segment must not have any active claims in the TokenStore
.
The processor must currently be actively processing the segment with given segmentId
.
Use StreamingEventProcessor.releaseSegment(int)
to force this processor to release any claims with tokens required to merge the
segments.
To find out which segment a given segmentId
should be merged with, use the following procedure:
EventTrackerStatus status = processor.processingStatus().get(segmentId); if (status == null) { // this processor is not processing segmentId, and will not be able to merge } return status.getSegment().mergeableSegmentId();
mergeSegment
in interface StreamingEventProcessor
segmentId
- the identifier of the segment to mergeCompletableFuture
indicating whether the merge was executed successfullyprotected void processingLoop(Segment segment)
Events with the same tracking token (which is possible as result of upcasting) should always be processed in the same batch. In those cases the batch size may be larger than the one configured.
segment
- The Segment
of the Stream that should be processed.protected Set<Segment> processingSegments(TrackingToken token, Segment segment)
eventMessage
identified with given token
should be processed as part of the
given segment
. This implementation is away of merge tokens and will recursively detect the (sub)segment
in which an event should be handled.token
- The token to check segment validity forsegment
- The segment to process the event intrue
if this event should be handled, otherwise false
protected boolean canHandle(EventMessage<?> eventMessage, Collection<Segment> segments) throws Exception
eventMessage
for any of the given segments
.eventMessage
- The message to handlesegments
- The segments to handle the message inException
- when an exception occurs evaluating the messagepublic void releaseSegment(int segmentId)
segmentId
.
This will also ignore the specified this segment for "re-claiming" for twice the TrackingEventProcessorConfiguration.getTokenClaimInterval()
token claim interval.
releaseSegment
in interface StreamingEventProcessor
segmentId
- the id of the segment to releasepublic void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)
segmentId
. This processor will not try to claim
the given segment for the specified releaseDuration
in the given unit
, to ensure it is not
immediately reclaimed. Note that this will override any previous release duration that existed for this segment.
Providing a negative value will allow the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId
, claiming it will be ignored
for the given timeframe nonetheless.
This method will put the segment on a non-claim map. During the next iteration of the processingLoop(Segment)
the segments will be unclaimed and the worker stopped if it is found to be in this map. This means it can take
up to the batch processing time, or up to the eventAvailabilityTimeout
if there are no events in the stream
for the segment to be unclaimed.
releaseSegment
in interface StreamingEventProcessor
segmentId
- the id of the segment to be released for the specified releaseDuration
releaseDuration
- the amount of time to disregard segmentId
for processingunit
- the unit of time used to express the releaseDuration
public CompletableFuture<Boolean> claimSegment(int segmentId)
segmentId
and start processing it as soon as
possible.
The given segmentId
must not be currently processed by a different processor instance, as that will
have an active claim on the token. Claiming a segment that is already being processed will have no effect
and return true
.
A true
return value indicates that the segment has been claimed and will be processed by this processor.
The StreamingEventProcessor
may postpone start of work until after completion of this task, as long as
the token has been claimed so work can be started. A return value of false
indicates that the segment
has not been claimed due to the token for that segment not being available.
This method will add an instruction for the WorkerLauncher
and set a flag that interrupts its sleep
to reduce the time it takes for the processor to claim the segment. Note that a thread has to be available for
a segment to start processing. The result will be false
if there is none available.
claimSegment
in interface StreamingEventProcessor
segmentId
- the identifier of the segment to claim and start processingCompletableFuture
providing the result of the claim operationpublic void resetTokens()
StreamingEventProcessor
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
public <R> void resetTokens(R resetContext)
StreamingEventProcessor
resetContext
will be
used to support the (optional) reset operation in an Event Handling Component.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
resetContext
- a R
used to support the reset operationpublic void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
StreamingEventProcessor
initialTrackingTokenSupplier
. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
initialTrackingTokenSupplier
- a function returning the token representing the position to reset topublic <R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier, R resetContext)
StreamingEventProcessor
initialTrackingTokenSupplier
. This effectively causes
a replay since that position. The given resetContext
will be used to support the (optional) reset
operation in an Event Handling Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
initialTrackingTokenSupplier
- a function returning the token representing the position to reset toresetContext
- a R
used to support the reset operationpublic void resetTokens(@Nonnull TrackingToken startPosition)
StreamingEventProcessor
startPosition
. This effectively causes a replay of events since that
position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
startPosition
- the token representing the position to reset the processor topublic <R> void resetTokens(@Nonnull TrackingToken startPosition, R resetContext)
StreamingEventProcessor
startPosition
. This effectively causes a replay of events since that position.
The given resetContext
will be used to support the (optional) reset operation in an Event Handling
Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
startPosition
- the token representing the position to reset the processor toresetContext
- a R
used to support the reset operationpublic boolean supportsReset()
StreamingEventProcessor
StreamingEventProcessor
supports a "reset". Generally, a reset is supported if at
least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly
prevent the resets.
This method should be invoked prior to invoking any of the StreamingEventProcessor.resetTokens()
operations as an early
validation.
supportsReset
in interface StreamingEventProcessor
true
if resets are supported, false
otherwisepublic boolean isRunning()
EventProcessor
isRunning
in interface EventProcessor
true
when running, otherwise false
public boolean isError()
EventProcessor
Note that this method returns false
when the processor was stopped using EventProcessor.shutDown()
.
isError
in interface EventProcessor
true
when paused due to an error, otherwise false
public StreamableMessageSource<? extends TrackedEventMessage<?>> getMessageSource()
StreamableMessageSource
this processor is usingStreamableMessageSource
public void shutDown()
shutDown
in interface EventProcessor
public CompletableFuture<Void> shutdownAsync()
CompletableFuture
that completes when the shutdown process is
finished.
Will be shutdown on the Phase.INBOUND_EVENT_CONNECTORS
phase.
shutdownAsync
in interface EventProcessor
public int availableProcessorThreads()
public int maxCapacity()
StreamingEventProcessor
EventProcessor
can process at the same time.maxCapacity
in interface StreamingEventProcessor
EventProcessor
can process at the same timepublic int activeProcessorThreads()
public Map<Integer,EventTrackerStatus> processingStatus()
StreamingEventProcessor
EventTrackerStatus
instances.
The key of the Map
represent the segment ids processed by this instance. The values of the returned
Map
represent the last known status of that segment.
Note that the returned Map
is unmodifiable, but does reflect any changes made to the status as the
processor is processing Events.
processingStatus
in interface StreamingEventProcessor
protected TrackingEventProcessor.State getState()
protected void startSegmentWorkers()
root
segment exists
in the TokenStore, it will be split in multiple segments as configured
by the TrackingEventProcessorConfiguration.andInitialSegmentsCount(int)
, otherwise the existing segments
in the TokenStore will be used.
An attempt will be made to instantiate a worker for each segment. This will succeed when the number of threads
matches the requested segments. The number of active threads can be configured with TrackingEventProcessorConfiguration.forParallelProcessing(int)
. When insufficient threads are available to serve
the number of segments, it will result in some segments not being processed.protected void doSleepFor(long millisToSleep)
The default implementation will sleep in blocks of 100ms, intermittently checking for the processor's state. Once the processor stops running, this method will return immediately (after detecting the state change).
millisToSleep
- The number of milliseconds to sleepprotected void doSleepFor(long millisToSleep, AtomicBoolean interruptFlag)
The default implementation will sleep in blocks of 100ms, intermittently checking for the processor's state. Once the processor stops running, this method will return immediately (after detecting the state change).
This method will also return when the given interruptFlag is set to true
. This can facilitate immediately
needed actions for the sleeping thread.
millisToSleep
- The number of milliseconds to sleepinterruptFlag
- A flag that is set when the thread should stop sleepingCopyright © 2010–2024. All rights reserved.