public static class PooledStreamingEventProcessor.Builder extends AbstractEventProcessor.Builder
PooledStreamingEventProcessor.
Upon initialization of this builder, the following fields are defaulted:
RollbackConfigurationType defaults to a RollbackConfigurationType.ANY_THROWABLE.ErrorHandler is defaulted to a PropagatingErrorHandler.MessageMonitor defaults to a NoOpMessageMonitor.initialSegmentCount defaults to 16.initialToken function defaults to a ReplayToken that starts streaming
from the tail with the replay flag enabled until the
head at the moment of initialization is reached.tokenClaimInterval defaults to 5000 milliseconds.MaxSegmentProvider (used by PooledStreamingEventProcessor.maxCapacity()) defaults to MaxSegmentProvider.maxShort().claimExtensionThreshold defaults to 5000 milliseconds.batchSize defaults to 1.Clock defaults to GenericEventMessage.clock.EventProcessorSpanFactory defaults to a DefaultEventProcessorSpanFactory backed by a NoOpSpanFactory.coordinatorExtendsClaims defaults to a false.EventProcessor.EventHandlerInvoker which will be given the events handled by this processorStreamableMessageSource used to retrieve events.TokenStore to store the progress of this processor in.ScheduledExecutorService to coordinate events and segment operations.ScheduledExecutorService to process work packages.name| Modifier | Constructor and Description |
|---|---|
protected |
Builder() |
public PooledStreamingEventProcessor.Builder name(@Nonnull String name)
AbstractEventProcessor.Buildername of this EventProcessor implementation.name in class AbstractEventProcessor.Buildername - a String defining this EventProcessor implementationpublic PooledStreamingEventProcessor.Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker)
AbstractEventProcessor.BuilderEventHandlerInvoker which will handle all the individual EventMessages.eventHandlerInvoker in class AbstractEventProcessor.BuildereventHandlerInvoker - the EventHandlerInvoker which will handle all the individual
EventMessagespublic PooledStreamingEventProcessor.Builder rollbackConfiguration(@Nonnull RollbackConfiguration rollbackConfiguration)
AbstractEventProcessor.BuilderRollbackConfiguration specifying the rollback behavior of the UnitOfWork while
processing a batch of events.rollbackConfiguration in class AbstractEventProcessor.BuilderrollbackConfiguration - the RollbackConfiguration specifying the rollback behavior of the
UnitOfWork while processing a batch of events.public PooledStreamingEventProcessor.Builder errorHandler(@Nonnull ErrorHandler errorHandler)
AbstractEventProcessor.BuilderErrorHandler invoked when an UnitOfWork is rolled back during processing. Defaults
to a PropagatingErrorHandler.errorHandler in class AbstractEventProcessor.BuildererrorHandler - the ErrorHandler invoked when an UnitOfWork is rolled back during
processingpublic PooledStreamingEventProcessor.Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor)
AbstractEventProcessor.BuilderMessageMonitor to monitor EventMessages before and after they're processed. Defaults
to a NoOpMessageMonitor.messageMonitor in class AbstractEventProcessor.BuildermessageMonitor - a MessageMonitor to monitor EventMessages before and after they're
processedpublic PooledStreamingEventProcessor.Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory)
AbstractEventProcessor.BuilderEventProcessorSpanFactory implementation to use for providing tracing capabilities. Defaults
to a DefaultEventProcessorSpanFactory backed by a NoOpSpanFactory by default, which provides
no tracing capabilities.spanFactory in class AbstractEventProcessor.BuilderspanFactory - The SpanFactory implementation@Deprecated public PooledStreamingEventProcessor.Builder spanFactory(@Nonnull SpanFactory spanFactory)
AbstractEventProcessor.BuilderSpanFactory implementation to use for providing tracing capabilities. Defaults to a
NoOpSpanFactory by default, which provides no tracing capabilities.spanFactory in class AbstractEventProcessor.BuilderspanFactory - The SpanFactory implementationpublic PooledStreamingEventProcessor.Builder messageSource(@Nonnull StreamableMessageSource<TrackedEventMessage<?>> messageSource)
messageSource - the StreamableMessageSource (e.g. the EventStore) which this
EventProcessor will trackpublic PooledStreamingEventProcessor.Builder tokenStore(@Nonnull TokenStore tokenStore)
TokenStore used to store and fetch event tokens that enable this EventProcessor to
track its progress.tokenStore - the TokenStore used to store and fetch event tokens that enable this
EventProcessor to track its progresspublic PooledStreamingEventProcessor.Builder transactionManager(@Nonnull TransactionManager transactionManager)
TransactionManager used when processing EventMessages.transactionManager - the TransactionManager used when processing EventMessagespublic PooledStreamingEventProcessor.Builder coordinatorExecutor(@Nonnull ScheduledExecutorService coordinatorExecutor)
ScheduledExecutorService used by the coordinator of this
PooledStreamingEventProcessor.coordinatorExecutor - the ScheduledExecutorService to be used by the the coordinator of this
PooledStreamingEventProcessorpublic PooledStreamingEventProcessor.Builder coordinatorExecutor(@Nonnull Function<String,ScheduledExecutorService> coordinatorExecutorBuilder)
ScheduledExecutorService used by the coordinator of this
PooledStreamingEventProcessor.coordinatorExecutorBuilder - a builder function to construct a ScheduledExecutorService,
providing the PooledStreamingEventProcessor@Deprecated public PooledStreamingEventProcessor.Builder workerExecutorService(@Nonnull ScheduledExecutorService workerExecutor)
workerExecutor(ScheduledExecutorService)ScheduledExecutorService to be provided to the WorkPackages created by this
PooledStreamingEventProcessor.workerExecutor - the ScheduledExecutorService to be provided to the WorkPackages created
by this PooledStreamingEventProcessorpublic PooledStreamingEventProcessor.Builder workerExecutor(@Nonnull ScheduledExecutorService workerExecutor)
ScheduledExecutorService to be provided to the WorkPackages created by this
PooledStreamingEventProcessor.workerExecutor - the ScheduledExecutorService to be provided to the WorkPackages created
by this PooledStreamingEventProcessorpublic PooledStreamingEventProcessor.Builder workerExecutor(@Nonnull Function<String,ScheduledExecutorService> workerExecutorBuilder)
ScheduledExecutorService to be provided to the
WorkPackages created by this PooledStreamingEventProcessor.workerExecutorBuilder - a builder function to construct a ScheduledExecutorService, providing
the PooledStreamingEventProcessorpublic PooledStreamingEventProcessor.Builder initialSegmentCount(int initialSegmentCount)
TokenStore upon start up of this StreamingEventProcessor. The given
value should at least be 1. Defaults to 16.initialSegmentCount - an int specifying the initial segment count used to create segments on
start uppublic PooledStreamingEventProcessor.Builder initialToken(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialToken)
Function used to generate the initial TrackingTokens. The function will be
given the configured StreamableMessageSource' so that its methods can be invoked for token creation.
Defaults to an automatic replay since the start of the stream.
More specifically, it defaults to a ReplayToken that starts streaming
from the tail with the replay flag enabled until the
head at the moment of initialization is reached.
initialToken - a Function generating the initial TrackingToken based on a given
StreamableMessageSourcepublic PooledStreamingEventProcessor.Builder tokenClaimInterval(long tokenClaimInterval)
5000
milliseconds.tokenClaimInterval - the time in milliseconds the processor's coordinator should wait after a failed
attempt to claim any segments for processingpublic PooledStreamingEventProcessor.Builder maxClaimedSegments(int maxClaimedSegments)
maxClaimedSegments - The maximum number of segments this instance may claim.public PooledStreamingEventProcessor.Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider)
StreamingEventProcessor may claim per instance. Defaults
to MaxSegmentProvider.maxShort().maxSegmentProvider - A MaxSegmentProvider providing the maximum number segments this
StreamingEventProcessor may claim per instance.public PooledStreamingEventProcessor.Builder claimExtensionThreshold(long claimExtensionThreshold)
TrackingToken. The threshold will only be met in absence of regular event processing, since that
updates the TrackingToken automatically. Defaults to 5000 milliseconds.claimExtensionThreshold - a time in milliseconds the work packages of this processor should extend the
claim on a TrackingToken.public PooledStreamingEventProcessor.Builder batchSize(int batchSize)
1.
Increasing this value with increase the processing speed dramatically, but requires certainty that the operations performed during event handling can be rolled back.
batchSize - the number of events to be processed inside a single transactionpublic PooledStreamingEventProcessor.Builder clock(@Nonnull Clock clock)
Clock used for time dependent operation by this EventProcessor. Used by the
Coordinator and WorkPackage threads to decide when to perform certain tasks, like updating
TrackingToken claims or when to unmark a Segment as "unclaimable". Defaults to
GenericEventMessage.clock.clock - the Clock used for time dependent operation by this EventProcessorpublic PooledStreamingEventProcessor.Builder enableCoordinatorClaimExtension()
Coordinator to extend the claims of its
WorkPackages.
Enabling "coordinator claim extension" is an optimization as it relieves this effort from the
WorkPackage. Toggling this feature may be particularly useful whenever the event handling task of the
WorkPackage is lengthy. Either because of a hefty event handling component or because of a
large batchSize(int).
An example of a lengthy processing tasks is whenever handling a batch of events exceeds half the
claimTimeout of the TokenStore. The claimTimeout defaults to 10 seconds for all
durable TokenStore implementations.
In both scenarios, there's a window of opportunity that the WorkPackage is not fast enough in
extending the claim itself. Not being able to do so potentially causes token stealing by other instances of
this PooledStreamingEventProcessor, thus overburdening the overall event processing task.
Note that enabling this feature will result in more frequent invocation of the TokenStore to update
the tokens.
public PooledStreamingEventProcessor build()
PooledStreamingEventProcessor as specified through this Builder.PooledStreamingEventProcessor as specified through this Builderprotected void validate()
throws AxonConfigurationException
AbstractEventProcessor.Buildervalidate in class AbstractEventProcessor.BuilderAxonConfigurationException - if one field is asserted to be incorrect according to the Builder's
specificationspublic String name()
PooledStreamingEventProcessor.PooledStreamingEventProcessorCopyright © 2010–2025. All rights reserved.