public static class PooledStreamingEventProcessor.Builder extends AbstractEventProcessor.Builder
PooledStreamingEventProcessor
.
Upon initialization of this builder, the following fields are defaulted:
RollbackConfigurationType
defaults to a RollbackConfigurationType.ANY_THROWABLE
.ErrorHandler
is defaulted to a PropagatingErrorHandler
.MessageMonitor
defaults to a NoOpMessageMonitor
.initialSegmentCount
defaults to 16
.initialToken
function defaults to StreamableMessageSource.createTailToken()
.tokenClaimInterval
defaults to 5000
milliseconds.MaxSegmentProvider
(used by PooledStreamingEventProcessor.maxCapacity()
) defaults to MaxSegmentProvider.maxShort()
.claimExtensionThreshold
defaults to 5000
milliseconds.batchSize
defaults to 1
.Clock
defaults to GenericEventMessage.clock
.EventProcessorSpanFactory
defaults to a DefaultEventProcessorSpanFactory
backed by a NoOpSpanFactory
.coordinatorExtendsClaims
defaults to a false
.EventProcessor
.EventHandlerInvoker
which will be given the events handled by this processorStreamableMessageSource
used to retrieve events.TokenStore
to store the progress of this processor in.ScheduledExecutorService
to coordinate events and segment operations.ScheduledExecutorService
to process work packages.name
Modifier | Constructor and Description |
---|---|
protected |
Builder() |
public PooledStreamingEventProcessor.Builder name(@Nonnull String name)
AbstractEventProcessor.Builder
name
of this EventProcessor
implementation.name
in class AbstractEventProcessor.Builder
name
- a String
defining this EventProcessor
implementationpublic PooledStreamingEventProcessor.Builder eventHandlerInvoker(@Nonnull EventHandlerInvoker eventHandlerInvoker)
AbstractEventProcessor.Builder
EventHandlerInvoker
which will handle all the individual EventMessage
s.eventHandlerInvoker
in class AbstractEventProcessor.Builder
eventHandlerInvoker
- the EventHandlerInvoker
which will handle all the individual
EventMessage
spublic PooledStreamingEventProcessor.Builder rollbackConfiguration(@Nonnull RollbackConfiguration rollbackConfiguration)
AbstractEventProcessor.Builder
RollbackConfiguration
specifying the rollback behavior of the UnitOfWork
while
processing a batch of events.rollbackConfiguration
in class AbstractEventProcessor.Builder
rollbackConfiguration
- the RollbackConfiguration
specifying the rollback behavior of the
UnitOfWork
while processing a batch of events.public PooledStreamingEventProcessor.Builder errorHandler(@Nonnull ErrorHandler errorHandler)
AbstractEventProcessor.Builder
ErrorHandler
invoked when an UnitOfWork
is rolled back during processing. Defaults
to a PropagatingErrorHandler
.errorHandler
in class AbstractEventProcessor.Builder
errorHandler
- the ErrorHandler
invoked when an UnitOfWork
is rolled back during
processingpublic PooledStreamingEventProcessor.Builder messageMonitor(@Nonnull MessageMonitor<? super EventMessage<?>> messageMonitor)
AbstractEventProcessor.Builder
MessageMonitor
to monitor EventMessage
s before and after they're processed. Defaults
to a NoOpMessageMonitor
.messageMonitor
in class AbstractEventProcessor.Builder
messageMonitor
- a MessageMonitor
to monitor EventMessage
s before and after they're
processedpublic PooledStreamingEventProcessor.Builder spanFactory(@Nonnull EventProcessorSpanFactory spanFactory)
AbstractEventProcessor.Builder
EventProcessorSpanFactory
implementation to use for providing tracing capabilities. Defaults
to a DefaultEventProcessorSpanFactory
backed by a NoOpSpanFactory
by default, which provides
no tracing capabilities.spanFactory
in class AbstractEventProcessor.Builder
spanFactory
- The SpanFactory
implementation@Deprecated public PooledStreamingEventProcessor.Builder spanFactory(@Nonnull SpanFactory spanFactory)
AbstractEventProcessor.Builder
SpanFactory
implementation to use for providing tracing capabilities. Defaults to a
NoOpSpanFactory
by default, which provides no tracing capabilities.spanFactory
in class AbstractEventProcessor.Builder
spanFactory
- The SpanFactory
implementationpublic PooledStreamingEventProcessor.Builder messageSource(@Nonnull StreamableMessageSource<TrackedEventMessage<?>> messageSource)
messageSource
- the StreamableMessageSource
(e.g. the EventStore
) which this EventProcessor
will trackpublic PooledStreamingEventProcessor.Builder tokenStore(@Nonnull TokenStore tokenStore)
TokenStore
used to store and fetch event tokens that enable this EventProcessor
to
track its progress.tokenStore
- the TokenStore
used to store and fetch event tokens that enable this EventProcessor
to track its progresspublic PooledStreamingEventProcessor.Builder transactionManager(@Nonnull TransactionManager transactionManager)
TransactionManager
used when processing EventMessage
s.transactionManager
- the TransactionManager
used when processing EventMessage
spublic PooledStreamingEventProcessor.Builder coordinatorExecutor(@Nonnull ScheduledExecutorService coordinatorExecutor)
ScheduledExecutorService
used by the coordinator of this PooledStreamingEventProcessor
.coordinatorExecutor
- the ScheduledExecutorService
to be used by the the coordinator of this
PooledStreamingEventProcessor
public PooledStreamingEventProcessor.Builder coordinatorExecutor(@Nonnull Function<String,ScheduledExecutorService> coordinatorExecutorBuilder)
ScheduledExecutorService
used by the coordinator of this PooledStreamingEventProcessor
.coordinatorExecutorBuilder
- a builder function to construct a ScheduledExecutorService
,
providing the PooledStreamingEventProcessor
@Deprecated public PooledStreamingEventProcessor.Builder workerExecutorService(@Nonnull ScheduledExecutorService workerExecutor)
workerExecutor(ScheduledExecutorService)
ScheduledExecutorService
to be provided to the WorkPackage
s created by this
PooledStreamingEventProcessor
.workerExecutor
- the ScheduledExecutorService
to be provided to the WorkPackage
s created
by this PooledStreamingEventProcessor
public PooledStreamingEventProcessor.Builder workerExecutor(@Nonnull ScheduledExecutorService workerExecutor)
ScheduledExecutorService
to be provided to the WorkPackage
s created by this
PooledStreamingEventProcessor
.workerExecutor
- the ScheduledExecutorService
to be provided to the WorkPackage
s created
by this PooledStreamingEventProcessor
public PooledStreamingEventProcessor.Builder workerExecutor(@Nonnull Function<String,ScheduledExecutorService> workerExecutorBuilder)
ScheduledExecutorService
to be provided to the WorkPackage
s created by this PooledStreamingEventProcessor
.workerExecutorBuilder
- a builder function to construct a ScheduledExecutorService
, providing
the PooledStreamingEventProcessor
public PooledStreamingEventProcessor.Builder initialSegmentCount(int initialSegmentCount)
TokenStore
upon start up of this StreamingEventProcessor
. The given
value should at least be 1
. Defaults to 16
.initialSegmentCount
- an int
specifying the initial segment count used to create segments on
start uppublic PooledStreamingEventProcessor.Builder initialToken(@Nonnull Function<StreamableMessageSource<TrackedEventMessage<?>>,TrackingToken> initialToken)
Function
used to generate the initial TrackingToken
s. The function will be
given the configured StreamableMessageSource
' so that its methods can be invoked for token creation.
Defaults to an automatic replay since the start of the stream.
More specifically, it defaults to a ReplayToken
that starts streaming
from the tail
with the replay flag enabled until the
head
at the moment of initialization is reached.
initialToken
- a Function
generating the initial TrackingToken
based on a given
StreamableMessageSource
public PooledStreamingEventProcessor.Builder tokenClaimInterval(long tokenClaimInterval)
5000
milliseconds.tokenClaimInterval
- the time in milliseconds the processor's coordinator should wait after a failed
attempt to claim any segments for processingpublic PooledStreamingEventProcessor.Builder maxClaimedSegments(int maxClaimedSegments)
maxClaimedSegments
- The maximum number of segments this instance may claim.public PooledStreamingEventProcessor.Builder maxSegmentProvider(MaxSegmentProvider maxSegmentProvider)
StreamingEventProcessor
may claim per instance. Defaults
to MaxSegmentProvider.maxShort()
.maxSegmentProvider
- A MaxSegmentProvider
providing the maximum number segments this
StreamingEventProcessor
may claim per instance.public PooledStreamingEventProcessor.Builder claimExtensionThreshold(long claimExtensionThreshold)
TrackingToken
. The threshold will only be met in absence of regular event processing, since that updates the
TrackingToken
automatically. Defaults to 5000
milliseconds.claimExtensionThreshold
- a time in milliseconds the work packages of this processor should extend the
claim on a TrackingToken
.public PooledStreamingEventProcessor.Builder batchSize(int batchSize)
1
.
Increasing this value with increase the processing speed dramatically, but requires certainty that the operations performed during event handling can be rolled back.
batchSize
- the number of events to be processed inside a single transactionpublic PooledStreamingEventProcessor.Builder clock(@Nonnull Clock clock)
Clock
used for time dependent operation by this EventProcessor
. Used by the
Coordinator
and WorkPackage
threads to decide when to perform certain tasks, like updating
TrackingToken
claims or when to unmark a Segment
as "unclaimable". Defaults to GenericEventMessage.clock
.clock
- the Clock
used for time dependent operation by this EventProcessor
public PooledStreamingEventProcessor.Builder enableCoordinatorClaimExtension()
Coordinator
to extend the claims
of its
WorkPackages
.
Enabling "coordinator claim extension" is an optimization as it relieves this effort from the
WorkPackage
. Toggling this feature may be particularly useful whenever the event handling task of the
WorkPackage
is lengthy. Either because of a hefty event handling component or because of a
large batchSize(int)
.
An example of a lengthy processing tasks is whenever handling a batch of events exceeds half the
claimTimeout
of the TokenStore
. The claimTimeout
defaults to 10 seconds for all
durable TokenStore
implementations.
In both scenarios, there's a window of opportunity that the WorkPackage
is not fast enough in
extending the claim itself. Not being able to do so potentially causes token stealing by other instances of
this PooledStreamingEventProcessor
, thus overburdening the overall event processing task.
Note that enabling this feature will result in more frequent invocation of the TokenStore
to update
the tokens.
public PooledStreamingEventProcessor build()
PooledStreamingEventProcessor
as specified through this Builder.PooledStreamingEventProcessor
as specified through this Builderprotected void validate() throws AxonConfigurationException
AbstractEventProcessor.Builder
validate
in class AbstractEventProcessor.Builder
AxonConfigurationException
- if one field is asserted to be incorrect according to the Builder's
specificationspublic String name()
PooledStreamingEventProcessor
.PooledStreamingEventProcessor
Copyright © 2010–2024. All rights reserved.