public class PooledStreamingEventProcessor extends AbstractEventProcessor implements StreamingEventProcessor, Lifecycle
StreamingEventProcessor
implementation which pools it's resources to enhance processing speed. It utilizes
a Coordinator
as the means to stream events from a StreamableMessageSource
and creates so called work
packages. Every work package is in charge of a Segment
of the entire event stream. It is the Coordinator
's job to retrieve the events from the source and provide the events to all the work packages it is in
charge of.
This approach utilizes two threads pools. One to retrieve the events to provide them to the work packages and another
to actual handle the events. Respectively, the coordinator thread pool and the work package thread pool. It is this
approach which allows for greater parallelization and processing speed than the TrackingEventProcessor
.
If no TrackingToken
s are present for this processor, the PooledStreamingEventProcessor
will
initialize them in a given segment count. By default it will create 16
segments, which can be configured
through the PooledStreamingEventProcessor.Builder.initialSegmentCount(int)
.
Modifier and Type | Class and Description |
---|---|
static class |
PooledStreamingEventProcessor.Builder
Builder class to instantiate a
PooledStreamingEventProcessor . |
Lifecycle.LifecycleHandler, Lifecycle.LifecycleRegistry
spanFactory
Modifier | Constructor and Description |
---|---|
protected |
PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder builder)
Instantiate a
PooledStreamingEventProcessor based on the fields contained in the PooledStreamingEventProcessor.Builder . |
Modifier and Type | Method and Description |
---|---|
static PooledStreamingEventProcessor.Builder |
builder()
Instantiate a Builder to be able to create a
PooledStreamingEventProcessor . |
String |
getTokenStoreIdentifier()
Returns the unique identifier of the
TokenStore used by this StreamingEventProcessor . |
boolean |
isError()
Indicates whether the processor has been shut down due to an error.
|
boolean |
isRunning()
Indicates whether this processor is currently running (i.e.
|
int |
maxCapacity()
Specifies the maximum amount of segments this
EventProcessor can process at the same time. |
CompletableFuture<Boolean> |
mergeSegment(int segmentId)
Instruct the processor to merge the segment with given
segmentId back with the segment that it was
originally split from. |
Map<Integer,EventTrackerStatus> |
processingStatus()
Returns the status for each of the segments processed by this processor as
EventTrackerStatus instances. |
void |
registerLifecycleHandlers(Lifecycle.LifecycleRegistry handle)
Registers the activities to be executed in the various phases of an application's lifecycle.
|
void |
releaseSegment(int segmentId)
Instructs the processor to release the segment with given
segmentId . |
void |
releaseSegment(int segmentId,
long releaseDuration,
TimeUnit unit)
Instructs the processor to release the segment with given
segmentId . |
void |
resetTokens()
Resets tokens to their initial state.
|
void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier,
R resetContext)
Reset tokens to the position as return by the given
initialTrackingTokenSupplier . |
<R> void |
resetTokens(R resetContext)
Resets tokens to their initial state.
|
void |
resetTokens(TrackingToken startPosition)
Resets tokens to the given
startPosition . |
<R> void |
resetTokens(TrackingToken startPosition,
R resetContext)
Resets tokens to the given
startPosition . |
void |
shutDown()
Stops processing events.
|
CompletableFuture<Void> |
shutdownAsync()
Initiates a shutdown, providing a
CompletableFuture that completes when the shutdown process is
finished. |
CompletableFuture<Boolean> |
splitSegment(int segmentId)
Instruct the processor to split the segment with given
segmentId into two segments, allowing an
additional process to start processing events from it. |
void |
start()
Start processing events.
|
boolean |
supportsReset()
Indicates whether this
StreamingEventProcessor supports a "reset". |
canHandle, canHandleType, eventHandlerInvoker, getHandlerInterceptors, getName, processInUnitOfWork, processInUnitOfWork, registerHandlerInterceptor, reportIgnored, toString
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
isReplaying
getHandlerInterceptors, getName
registerHandlerInterceptor
protected PooledStreamingEventProcessor(PooledStreamingEventProcessor.Builder builder)
PooledStreamingEventProcessor
based on the fields contained in the PooledStreamingEventProcessor.Builder
.
Will assert the following for their presence prior to constructing this processor:
name
.EventHandlerInvoker
.StreamableMessageSource
.TokenStore
.TransactionManager
.ScheduledExecutorService
for coordination.ScheduledExecutorService
to process work packages.AxonConfigurationException
is thrown.builder
- the PooledStreamingEventProcessor.Builder
used to instantiate a PooledStreamingEventProcessor
instancepublic static PooledStreamingEventProcessor.Builder builder()
PooledStreamingEventProcessor
.
Upon initialization of this builder, the following fields are defaulted:
RollbackConfigurationType
defaults to a RollbackConfigurationType.ANY_THROWABLE
.ErrorHandler
is defaulted to a PropagatingErrorHandler
.MessageMonitor
defaults to a NoOpMessageMonitor
.initialSegmentCount
defaults to 16
.initialToken
function defaults to StreamableMessageSource.createTailToken()
.tokenClaimInterval
defaults to 5000
milliseconds.maxCapacity
(used by maxCapacity()
) defaults to Short.MAX_VALUE
.claimExtensionThreshold
defaults to 5000
milliseconds.batchSize
defaults to 1
.Clock
defaults to GenericEventMessage.clock
.SpanFactory
defaults to NoOpSpanFactory
.EventProcessor
.EventHandlerInvoker
which will be given the events handled by this processorStreamableMessageSource
used to retrieve events.TokenStore
to store the progress of this processor in.ScheduledExecutorService
to coordinate events and segment operations.ScheduledExecutorService
to process work packages.PooledStreamingEventProcessor
public void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry handle)
Lifecycle
registerLifecycleHandlers
in interface Lifecycle
handle
- the lifecycle instance to register the handlers withLifecycle.LifecycleRegistry.onShutdown(int, Runnable)
,
LifecycleRegistry#onShutdown(int, LifecycleHandler)
,
Lifecycle.LifecycleRegistry.onStart(int, Runnable)
,
LifecycleRegistry#onStart(int, LifecycleHandler)
public void start()
EventProcessor
start
in interface EventProcessor
public void shutDown()
EventProcessor
shutDown
in interface EventProcessor
public CompletableFuture<Void> shutdownAsync()
EventProcessor
CompletableFuture
that completes when the shutdown process is
finished.shutdownAsync
in interface EventProcessor
public boolean isRunning()
EventProcessor
isRunning
in interface EventProcessor
true
when running, otherwise false
public boolean isError()
EventProcessor
Note that this method returns false
when the processor was stopped using EventProcessor.shutDown()
.
isError
in interface EventProcessor
true
when paused due to an error, otherwise false
public String getTokenStoreIdentifier()
StreamingEventProcessor
TokenStore
used by this StreamingEventProcessor
.getTokenStoreIdentifier
in interface StreamingEventProcessor
TokenStore
used by this StreamingEventProcessor
public void releaseSegment(int segmentId)
StreamingEventProcessor
segmentId
.releaseSegment
in interface StreamingEventProcessor
segmentId
- the id of the segment to releasepublic void releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)
StreamingEventProcessor
segmentId
. This processor will not try to claim
the given segment for the specified releaseDuration
in the given unit
, to ensure it is not
immediately reclaimed. Note that this will override any previous release duration that existed for this segment.
Providing a negative value will allow the segment to be immediately claimed.
If the processor is not actively processing the segment with given segmentId
, claiming it will be ignored
for the given timeframe nonetheless.
releaseSegment
in interface StreamingEventProcessor
segmentId
- the id of the segment to be released for the specified releaseDuration
releaseDuration
- the amount of time to disregard segmentId
for processingunit
- the unit of time used to express the releaseDuration
public CompletableFuture<Boolean> splitSegment(int segmentId)
StreamingEventProcessor
segmentId
into two segments, allowing an
additional process to start processing events from it.
To be able to split segments, the TokenStore
configured with this processor must use explicitly
initialized tokens. See TokenStore.requiresExplicitSegmentInitialization()
. Also, the given segmentId
must be currently processed by a process owned by this processor instance.
splitSegment
in interface StreamingEventProcessor
segmentId
- the identifier of the segment to splitCompletableFuture
providing the result of the split operationpublic CompletableFuture<Boolean> mergeSegment(int segmentId)
StreamingEventProcessor
segmentId
back with the segment that it was
originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore,
this other segment must not have any active claims in the TokenStore
.
The processor must currently be actively processing the segment with given segmentId
.
Use StreamingEventProcessor.releaseSegment(int)
to force this processor to release any claims with tokens required to merge the
segments.
To find out which segment a given segmentId
should be merged with, use the following procedure:
EventTrackerStatus status = processor.processingStatus().get(segmentId); if (status == null) { // this processor is not processing segmentId, and will not be able to merge } return status.getSegment().mergeableSegmentId();
mergeSegment
in interface StreamingEventProcessor
segmentId
- the identifier of the segment to mergeCompletableFuture
indicating whether the merge was executed successfullypublic boolean supportsReset()
StreamingEventProcessor
StreamingEventProcessor
supports a "reset". Generally, a reset is supported if at
least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly
prevent the resets.
This method should be invoked prior to invoking any of the StreamingEventProcessor.resetTokens()
operations as an early
validation.
supportsReset
in interface StreamingEventProcessor
true
if resets are supported, false
otherwisepublic void resetTokens()
StreamingEventProcessor
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
public <R> void resetTokens(R resetContext)
StreamingEventProcessor
resetContext
will be
used to support the (optional) reset operation in an Event Handling Component.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
resetContext
- a R
used to support the reset operationpublic void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier)
StreamingEventProcessor
initialTrackingTokenSupplier
. This effectively causes
a replay since that position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
initialTrackingTokenSupplier
- a function returning the token representing the position to reset topublic <R> void resetTokens(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialTrackingTokenSupplier, R resetContext)
StreamingEventProcessor
initialTrackingTokenSupplier
. This effectively causes
a replay since that position. The given resetContext
will be used to support the (optional) reset
operation in an Event Handling Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
initialTrackingTokenSupplier
- a function returning the token representing the position to reset toresetContext
- a R
used to support the reset operationpublic void resetTokens(@Nonnull TrackingToken startPosition)
StreamingEventProcessor
startPosition
. This effectively causes a replay of events since that
position.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
startPosition
- the token representing the position to reset the processor topublic <R> void resetTokens(@Nonnull TrackingToken startPosition, R resetContext)
StreamingEventProcessor
startPosition
. This effectively causes a replay of events since that position.
The given resetContext
will be used to support the (optional) reset operation in an Event Handling
Component.
Note that the new token must represent a position that is before the current position of the processor.
Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.
resetTokens
in interface StreamingEventProcessor
R
- the type of the provided resetContext
startPosition
- the token representing the position to reset the processor toresetContext
- a R
used to support the reset operationpublic int maxCapacity()
EventProcessor
can process at the same time.
The maximum capacity of the PooledStreamingEventProcessor
defaults to . If
required, this value can be adjusted through the PooledStreamingEventProcessor.Builder.maxClaimedSegments(int)
method.
maxCapacity
in interface StreamingEventProcessor
EventProcessor
can process at the same timepublic Map<Integer,EventTrackerStatus> processingStatus()
StreamingEventProcessor
EventTrackerStatus
instances.
The key of the Map
represent the segment ids processed by this instance. The values of the returned
Map
represent the last known status of that segment.
Note that the returned Map
is unmodifiable, but does reflect any changes made to the status as the
processor is processing Events.
processingStatus
in interface StreamingEventProcessor
Copyright © 2010–2023. All rights reserved.