public class TrackingEventProcessor extends AbstractEventProcessor
StreamableMessageSource
.
A supplied TokenStore
allows the EventProcessor to keep track of its position in the event log. After
processing an event batch the EventProcessor updates its tracking token in the TokenStore.
A TrackingEventProcessor is able to continue processing from the last stored token when it is restarted. It is also capable of replaying events from any starting token. To replay the entire event log simply remove the tracking token of this processor from the TokenStore. To replay from a given point first update the entry for this processor in the TokenStore before starting this processor.
Note, the name
of the EventProcessor is used to obtain the tracking token from the TokenStore, so
take care when renaming a TrackingEventProcessor.
Modifier and Type | Class and Description |
---|---|
static class |
TrackingEventProcessor.Builder
Builder class to instantiate a
TrackingEventProcessor . |
protected static class |
TrackingEventProcessor.State
Enum representing the possible states of the Processor
|
Modifier | Constructor and Description |
---|---|
protected |
TrackingEventProcessor(TrackingEventProcessor.Builder builder)
Instantiate a
TrackingEventProcessor based on the fields contained in the TrackingEventProcessor.Builder . |
Modifier and Type | Method and Description |
---|---|
int |
activeProcessorThreads()
Returns an approximation of the number of threads currently processing events.
|
int |
availableProcessorThreads()
Returns the number of threads this processor has available to assign segments.
|
static TrackingEventProcessor.Builder |
builder()
Instantiate a Builder to be able to create a
TrackingEventProcessor . |
protected void |
doSleepFor(long millisToSleep)
Instructs the current Thread to sleep until the given deadline.
|
protected TrackingEventProcessor.State |
getState()
Get the state of the event processor.
|
boolean |
isError()
Indicates whether the processor has been paused due to an error.
|
boolean |
isRunning()
Indicates whether this processor is currently running (i.e.
|
protected void |
processingLoop(Segment segment)
Fetch and process event batches continuously for as long as the processor is not shutting down.
|
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the Segments processed by the current processor.
|
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId . |
void |
releaseSegment(int segmentId,
long blacklistDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId . |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition . |
void |
shutDown()
Shut down the processor.
|
void |
start()
Start this processor.
|
protected void |
startSegmentWorkers()
Starts the
workers for a number of segments. |
boolean |
supportsReset()
Indicates whether this tracking processor supports a "reset".
|
canHandle, eventHandlerInvoker, getHandlerInterceptors, getName, processInUnitOfWork, registerHandlerInterceptor, reportIgnored, toString
protected TrackingEventProcessor(TrackingEventProcessor.Builder builder)
TrackingEventProcessor
based on the fields contained in the TrackingEventProcessor.Builder
.
Will assert that the Event Processor name
, EventHandlerInvoker
, StreamableMessageSource
,
TokenStore
and TransactionManager
are not null
, and will throw an
AxonConfigurationException
if any of them is null
.
builder
- the TrackingEventProcessor.Builder
used to instantiate a TrackingEventProcessor
instancepublic static TrackingEventProcessor.Builder builder()
TrackingEventProcessor
.
The RollbackConfigurationType
defaults to a RollbackConfigurationType.ANY_THROWABLE
, the
ErrorHandler
is defaulted to a PropagatingErrorHandler
, the MessageMonitor
defaults to a
NoOpMessageMonitor
and the TrackingEventProcessorConfiguration
to a
TrackingEventProcessorConfiguration.forSingleThreadedProcessing()
call. The Event Processor name
,
EventHandlerInvoker
, StreamableMessageSource
, TokenStore
and TransactionManager
are hard requirements and as such should be provided.
TrackingEventProcessor
public void start()
StreamableMessageSource.openStream(TrackingToken)
. The TrackingToken
used to open the stream will be
fetched from the TokenStore
.protected void processingLoop(Segment segment)
Events with the same tracking token (which is possible as result of upcasting) should always be processed in the same batch. In those cases the batch size may be larger than the one configured.
segment
- The Segment
of the Stream that should be processed.public void releaseSegment(int segmentId)
segmentId
. This will also blacklist this
segment for twice the token claim interval
,
to ensure it is not immediately reclaimed.segmentId
- the id of the segment to be blacklistedpublic void releaseSegment(int segmentId, long blacklistDuration, TimeUnit unit)
segmentId
. This will also blacklist this
segment for the given blacklistDuration
, to ensure it is not immediately reclaimed. Note that this will
override any previous blacklist duration that existed for this segment. Providing a negative value will allow
the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId
, it will be blacklisted
nonetheless.
segmentId
- the id of the segment to be blacklistedblacklistDuration
- the amount of time to blacklist this segment for processing by this processor instanceunit
- the unit of time used to express the blacklistDuration
public void resetTokens()
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
public void resetTokens(Function<StreamableMessageSource,TrackingToken> initialTrackingTokenSupplier)
initialTrackingTokenSupplier
. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
initialTrackingTokenSupplier
- A function returning the token representing the position to reset topublic void resetTokens(TrackingToken startPosition)
startPosition
. This effectively causes a replay of events since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
startPosition
- The token representing the position to reset the processor to.public boolean supportsReset()
true
if resets are supported, false
otherwisepublic boolean isRunning()
true
when running, otherwise false
public boolean isError()
Note that this method also returns false
when the processor was stooped using shutDown()
.
true
when paused due to an error, otherwise false
public void shutDown()
public int availableProcessorThreads()
public int activeProcessorThreads()
public Map<Integer,EventTrackerStatus> processingStatus()
Note that the returned Map in unmodifiable, but does reflect any changes made to the status as the processor is processing Events.
protected TrackingEventProcessor.State getState()
protected void startSegmentWorkers()
workers
for a number of segments. When only the
root
segment exists
in the TokenStore,
it will be split in multiple segments as configured by the
TrackingEventProcessorConfiguration.andInitialSegmentsCount(int)
, otherwise the existing segments in
the TokenStore will be used.
An attempt will be made to instantiate a TrackingSegmentWorker
for each segment. This will succeed when
the number of threads matches the requested segments. The number of active threads can be configured with
TrackingEventProcessorConfiguration.forParallelProcessing(int)
. When insufficient threads are available
to serve the number of segments, it will result in some segments not being processed.protected void doSleepFor(long millisToSleep)
The default implementation will sleep in blocks of 100ms, intermittently checking for the processor's state. Once the processor stops running, this method will return immediately (after detecting the state change).
millisToSleep
- The number of milliseconds to sleepCopyright © 2010–2018. All rights reserved.