public class TrackingEventProcessor extends AbstractEventProcessor
StreamableMessageSource.
A supplied TokenStore allows the EventProcessor to keep track of its position in the event log. After
processing an event batch the EventProcessor updates its tracking token in the TokenStore.
A TrackingEventProcessor is able to continue processing from the last stored token when it is restarted. It is also capable of replaying events from any starting token. To replay the entire event log simply remove the tracking token of this processor from the TokenStore. To replay from a given point first update the entry for this processor in the TokenStore before starting this processor.
Note, the name of the EventProcessor is used to obtain the tracking token from the TokenStore, so
take care when renaming a TrackingEventProcessor.
| Modifier and Type | Class and Description |
|---|---|
protected static class |
TrackingEventProcessor.State |
| Constructor and Description |
|---|
TrackingEventProcessor(String name,
EventHandlerInvoker eventHandlerInvoker,
RollbackConfiguration rollbackConfiguration,
ErrorHandler errorHandler,
StreamableMessageSource<TrackedEventMessage<?>> messageSource,
TokenStore tokenStore,
TransactionManager transactionManager,
int batchSize,
MessageMonitor<? super EventMessage<?>> messageMonitor)
Initializes an EventProcessor with given
name that subscribes to the given messageSource for
events. |
TrackingEventProcessor(String name,
EventHandlerInvoker eventHandlerInvoker,
StreamableMessageSource<TrackedEventMessage<?>> messageSource,
TokenStore tokenStore,
TransactionManager transactionManager)
Initializes an EventProcessor with given
name that subscribes to the given messageSource for
events. |
TrackingEventProcessor(String name,
EventHandlerInvoker eventHandlerInvoker,
StreamableMessageSource<TrackedEventMessage<?>> messageSource,
TokenStore tokenStore,
TransactionManager transactionManager,
int batchSize)
Initializes an EventProcessor with given
name that subscribes to the given messageSource for
events. |
TrackingEventProcessor(String name,
EventHandlerInvoker eventHandlerInvoker,
StreamableMessageSource<TrackedEventMessage<?>> messageSource,
TokenStore tokenStore,
TransactionManager transactionManager,
MessageMonitor<? super EventMessage<?>> messageMonitor)
Initializes an EventProcessor with given
name that subscribes to the given messageSource for
events. |
| Modifier and Type | Method and Description |
|---|---|
int |
activeProcessorThreads()
Returns an approximation of the number of threads currently processing events.
|
protected TrackingEventProcessor.State |
getState()
Get the state of the event processor.
|
boolean |
isError()
Indicates whether the processor has been paused due to an error.
|
boolean |
isRunning()
Indicates whether this processor is currently running (i.e.
|
void |
pause()
Stops processing if it currently running, but doesn't stop free up the processing thread.
|
protected void |
processingLoop()
Fetch and process event batches continuously for as long as the processor is not shutting down.
|
void |
shutDown()
Shut down the processor.
|
void |
start()
Start this processor.
|
getName, process, registerInterceptor, toStringpublic TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore, TransactionManager transactionManager)
name that subscribes to the given messageSource for
events. Actual handling of event messages is deferred to the given eventHandlerInvoker.
The EventProcessor is initialized with a batch size of 1, a PropagatingErrorHandler, a RollbackConfigurationType.ANY_THROWABLE and a NoOpMessageMonitor.
name - The name of the event processoreventHandlerInvoker - The component that handles the individual eventsmessageSource - The message source (e.g. Event Bus) which this event processor will tracktokenStore - Used to store and fetch event tokens that enable the processor to track its progresstransactionManager - The transaction manager used when processing messagespublic TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore, TransactionManager transactionManager, int batchSize)
name that subscribes to the given messageSource for
events. Actual handling of event messages is deferred to the given eventHandlerInvoker.
The EventProcessor is initialized with a batch size of 1, a PropagatingErrorHandler, a RollbackConfigurationType.ANY_THROWABLE and a NoOpMessageMonitor.
name - The name of the event processoreventHandlerInvoker - The component that handles the individual eventsmessageSource - The message source (e.g. Event Bus) which this event processor will tracktokenStore - Used to store and fetch event tokens that enable the processor to track its progresstransactionManager - The transaction manager used when processing messagesbatchSize - The maximum number of events to process in a single batchpublic TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore, TransactionManager transactionManager, MessageMonitor<? super EventMessage<?>> messageMonitor)
name that subscribes to the given messageSource for
events. Actual handling of event messages is deferred to the given eventHandlerInvoker.
The EventProcessor is initialized with a batch size of 1, a PropagatingErrorHandler and a RollbackConfigurationType.ANY_THROWABLE.
name - The name of the event processoreventHandlerInvoker - The component that handles the individual eventsmessageSource - The message source (e.g. Event Bus) which this event processor will tracktokenStore - Used to store and fetch event tokens that enable the processor to track its progresstransactionManager - The transaction manager used when processing messagesmessageMonitor - Monitor to be invoked before and after event processingpublic TrackingEventProcessor(String name, EventHandlerInvoker eventHandlerInvoker, RollbackConfiguration rollbackConfiguration, ErrorHandler errorHandler, StreamableMessageSource<TrackedEventMessage<?>> messageSource, TokenStore tokenStore, TransactionManager transactionManager, int batchSize, MessageMonitor<? super EventMessage<?>> messageMonitor)
name that subscribes to the given messageSource for
events. Actual handling of event messages is deferred to the given eventHandlerInvoker.name - The name of the event processoreventHandlerInvoker - The component that handles the individual eventsrollbackConfiguration - Determines rollback behavior of the UnitOfWork while processing a batch of eventserrorHandler - Invoked when a UnitOfWork is rolled back during processingmessageSource - The message source (e.g. Event Bus) which this event processor will tracktokenStore - Used to store and fetch event tokens that enable the processor to track its
progresstransactionManager - The transaction manager used when processing messagesbatchSize - The maximum number of events to process in a single batchmessageMonitor - Monitor to be invoked before and after event processingpublic void start()
StreamableMessageSource.openStream(TrackingToken). The TrackingToken used to open the stream will be
fetched from the TokenStore.protected void processingLoop()
Events with the same tracking token (which is possible as result of upcasting) should always be processed in the same batch. In those cases the batch size may be larger than the one configured.
public void pause()
public boolean isRunning()
true when running, otherwise falsepublic boolean isError()
Note that this method also returns false when the processor was paused using pause().
true when paused due to an error, otherwise falsepublic void shutDown()
public int activeProcessorThreads()
protected TrackingEventProcessor.State getState()
Copyright © 2010–2017. All rights reserved.