Class PooledStreamingEventProcessorConfiguration
- All Implemented Interfaces:
DescribableComponent
PooledStreamingEventProcessor.
Upon initialization of this configuration, the following fields are defaulted:
- The
ErrorHandleris defaulted to aPropagatingErrorHandler. - The
initialSegmentCountdefaults to16. - The
initialTokenfunction defaults to aReplayTokenthat starts streaming from thetailwith the replay flag enabled until theheadat the moment of initialization is reached. - The
tokenClaimIntervaldefaults to5000milliseconds. - The
MaxSegmentProvider(used byPooledStreamingEventProcessor.maxCapacity()) defaults toMaxSegmentProvider.maxShort(). - The
claimExtensionThresholddefaults to5000milliseconds. - The
batchSizedefaults to1. - The
Clockdefaults toGenericEventMessage.clock. - The
coordinatorExtendsClaimsdefaults to afalse.
- A
StreamableEventSourceused to retrieve events. - A
TokenStoreto store the progress of this processor in. - A
ScheduledExecutorServiceto coordinate events and segment operations. - A
ScheduledExecutorServiceto process work packages.
- Since:
- 5.0.0
- Author:
- Mateusz Nowak
-
Field Summary
Fields inherited from class org.axonframework.messaging.eventhandling.configuration.EventProcessorConfiguration
errorHandler, interceptorBuilder, interceptors, monitorBuilder, processorName, unitOfWorkFactory -
Constructor Summary
ConstructorsConstructorDescriptionConstructs a newPooledStreamingEventProcessorConfigurationcopying properties from the given configuration.PooledStreamingEventProcessorConfiguration(EventProcessorConfiguration base, Configuration configuration) Constructs a newPooledStreamingEventProcessorConfigurationwith default values and retrieve global default values. -
Method Summary
Modifier and TypeMethodDescriptionaddSegmentChangeListener(SegmentChangeListener segmentChangeListener) Adds a listener invoked when segments are claimed or released.intReturns the number of events processed in a single transaction.batchSize(int batchSize) Specifies the number of events to be processed inside a single transaction.longReturns the claim extension threshold in milliseconds.claimExtensionThreshold(long claimExtensionThreshold) Specifies a time in milliseconds the work packages of this processor should extend the claim on aTrackingToken.clock()Returns theClockused for time-dependent operations.Defines theClockused for time dependent operation by thisEventProcessor.Returns the coordinator'sScheduledExecutorService.coordinatorExecutor(ScheduledExecutorService coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.coordinatorExecutor(Supplier<ScheduledExecutorService> coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.booleanReturns whether the coordinator extends claims for work packages.Returns the mergedDeadLetterQueueConfigurationby applying all customizations.deadLetterQueue(UnaryOperator<DeadLetterQueueConfiguration> customization) Configures the Dead Letter Queue (DLQ) for this processor using a customization function.voiddescribeTo(ComponentDescriptor descriptor) Describe the properties ofthis DescribableComponentwith the givendescriptor.Enables theCoordinatortoextend the claimsof itsWorkPackages.errorHandler(ErrorHandler errorHandler) Sets theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing.eventCriteria(Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider) Sets the function to build theEventCriteriaused to filter events when opening the event source.Returns the function to buildEventCriteriafrom supported event types.Returns theStreamableEventSourceused to track events.eventSource(StreamableEventSource eventSource) Consumer<? super EventMessage> Returns the handler for ignored messages.ignoredMessageHandler(Consumer<? super EventMessage> ignoredMessageHandler) Sets the handler, that is invoked when the event is ignored by allWorkPackages thisCoordinatorcontrols.intReturns the initial segment count used on startup.initialSegmentCount(int initialSegmentCount) Sets the initial segment count used to create segments on start up.Returns the function used to generate initialTrackingTokens.initialToken(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) Specifies theFunctionused to generate the initialTrackingTokens.List<MessageHandlerInterceptor<? super EventMessage>> Returns the list ofEventMessage-specificMessageHandlerInterceptorsto add to thePooledStreamingEventProcessorunder construction with this configuration implementation.maxClaimedSegments(int maxClaimedSegments) Sets the maximum number of segments this instance may claim.Returns theMaxSegmentProviderdefining maximum claimable segments.maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) Defines the maximum number of segment thisStreamingEventProcessormay claim per instance.Returns theSupplierproviding theProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.schedulingProcessingContextProvider(Supplier<ProcessingContext> schedulingProcessingContextProvider) Provides aProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.Returns the configured segment change listener composed from all registered listeners.booleanReturns whether this configuration is for a streaming event processor.longReturns the token claim interval in milliseconds.tokenClaimInterval(long tokenClaimInterval) Specifies the time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing.Returns theTokenStoreused to store and fetch event tokens.tokenStore(TokenStore tokenStore) Sets theTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.unitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) AUnitOfWorkFactorythat spawnsUnitOfWorkused to process an event batch.protected voidvalidate()Validates whether the fields contained in this Builder are set accordingly.withInterceptor(MessageHandlerInterceptor<? super EventMessage> interceptor) Registers the givenEventMessage-specificMessageHandlerInterceptorfor thePooledStreamingEventProcessorunder construction.Returns the worker'sScheduledExecutorService.workerExecutor(ScheduledExecutorService workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.workerExecutor(Supplier<ScheduledExecutorService> workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.Methods inherited from class org.axonframework.messaging.eventhandling.configuration.EventProcessorConfiguration
errorHandler, unitOfWorkFactory
-
Constructor Details
-
PooledStreamingEventProcessorConfiguration
Constructs a newPooledStreamingEventProcessorConfigurationcopying properties from the given configuration.- Parameters:
base- TheEventProcessorConfigurationto copy properties from.
-
PooledStreamingEventProcessorConfiguration
@Internal public PooledStreamingEventProcessorConfiguration(EventProcessorConfiguration base, Configuration configuration) Constructs a newPooledStreamingEventProcessorConfigurationwith default values and retrieve global default values.- Parameters:
base- TheEventProcessorConfigurationto copy properties from.configuration- The configuration, used to retrieve global default values, likeMessageHandlerInterceptors, from.
-
-
Method Details
-
errorHandler
Description copied from class:EventProcessorConfigurationSets theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing. Defaults to aPropagatingErrorHandler.- Overrides:
errorHandlerin classEventProcessorConfiguration- Parameters:
errorHandler- theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing- Returns:
- The current instance, for fluent interfacing.
-
unitOfWorkFactory
public PooledStreamingEventProcessorConfiguration unitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) Description copied from class:EventProcessorConfigurationAUnitOfWorkFactorythat spawnsUnitOfWorkused to process an event batch.- Overrides:
unitOfWorkFactoryin classEventProcessorConfiguration- Parameters:
unitOfWorkFactory- AUnitOfWorkFactorythat spawnsUnitOfWork.- Returns:
- The current instance, for fluent interfacing.
-
eventSource
- Parameters:
eventSource- TheStreamableEventSource(e.g. theEventStore) which thisEventProcessorwill track.- Returns:
- The current instance, for fluent interfacing.
-
withInterceptor
public PooledStreamingEventProcessorConfiguration withInterceptor(MessageHandlerInterceptor<? super EventMessage> interceptor) Registers the givenEventMessage-specificMessageHandlerInterceptorfor thePooledStreamingEventProcessorunder construction.- Parameters:
interceptor- TheEventMessage-specificMessageHandlerInterceptorto register for thePooledStreamingEventProcessorunder construction.- Returns:
- This
PooledStreamingEventProcessorConfiguration, for fluent interfacing.
-
tokenStore
Sets theTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.- Parameters:
tokenStore- TheTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.- Returns:
- The current instance, for fluent interfacing.
-
coordinatorExecutor
public PooledStreamingEventProcessorConfiguration coordinatorExecutor(ScheduledExecutorService coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.- Parameters:
coordinatorExecutor- TheScheduledExecutorServiceto be used by the coordinator of thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
coordinatorExecutor
public PooledStreamingEventProcessorConfiguration coordinatorExecutor(Supplier<ScheduledExecutorService> coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.- Parameters:
coordinatorExecutor- TheScheduledExecutorServiceto be used by the coordinator of thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
workerExecutor
public PooledStreamingEventProcessorConfiguration workerExecutor(ScheduledExecutorService workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor. Note thatworkerExecutor(Supplier)is favored over this method, as it avoids eager initialization of an executor that may be overridden by other components.- Parameters:
workerExecutor- TheScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
workerExecutor
public PooledStreamingEventProcessorConfiguration workerExecutor(Supplier<ScheduledExecutorService> workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Parameters:
workerExecutor- TheScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
initialSegmentCount
Sets the initial segment count used to create segments on start up. Only used whenever there are no segments stored in the configuredTokenStoreupon start up of thisStreamingEventProcessor. The given value should at least be1. Defaults to16.- Parameters:
initialSegmentCount- Theintspecifying the initial segment count used to create segments on startup.- Returns:
- The current instance, for fluent interfacing.
-
initialToken
public PooledStreamingEventProcessorConfiguration initialToken(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) Specifies theFunctionused to generate the initialTrackingTokens. The function will be given the configuredStreamableEventSourceso that its methods can be invoked for token creation.Defaults to an automatic replay since the start of the stream.
More specifically, it defaults to a
ReplayTokenthat starts streaming from thetailwith the replay flag enabled until theheadat the moment of initialization is reached.- Parameters:
initialToken- TheFunctiongenerating the initialTrackingTokenbased on a givenStreamableEventSource.- Returns:
- The current instance, for fluent interfacing.
-
tokenClaimInterval
Specifies the time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing. Generally, this means all segments are claimed. Defaults to5000milliseconds.- Parameters:
tokenClaimInterval- The time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing.- Returns:
- The current instance, for fluent interfacing.
-
maxClaimedSegments
Sets the maximum number of segments this instance may claim.- Parameters:
maxClaimedSegments- The maximum number of segments this instance may claim.- Returns:
- The current instance, for fluent interfacing.
-
maxSegmentProvider
public PooledStreamingEventProcessorConfiguration maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) Defines the maximum number of segment thisStreamingEventProcessormay claim per instance. Defaults toMaxSegmentProvider.maxShort().- Parameters:
maxSegmentProvider- AMaxSegmentProviderproviding the maximum number segments thisStreamingEventProcessormay claim per instance.- Returns:
- The current instance, for fluent interfacing.
-
claimExtensionThreshold
public PooledStreamingEventProcessorConfiguration claimExtensionThreshold(long claimExtensionThreshold) Specifies a time in milliseconds the work packages of this processor should extend the claim on aTrackingToken. The threshold will only be met in absence of regular event processing, since that update theTrackingTokenautomatically. Defaults to5000milliseconds.- Parameters:
claimExtensionThreshold- The time in milliseconds the work packages of this processor should extend the claim on aTrackingToken.- Returns:
- The current instance, for fluent interfacing
-
batchSize
Specifies the number of events to be processed inside a single transaction. Defaults to a batch size of1.Increasing this value with increase the processing speed dramatically, but requires certainty that the operations performed during event handling can be rolled back.
- Parameters:
batchSize- The number of events to be processed inside a single transaction.- Returns:
- The current instance, for fluent interfacing.
-
clock
Defines theClockused for time dependent operation by thisEventProcessor. Used by theCoordinatorandWorkPackagethreads to decide when to perform certain tasks, like updatingTrackingTokenclaims or when to unmark aSegmentas "unclaimable". Defaults toGenericEventMessage.clock.- Parameters:
clock- TheClockused for time dependent operation by thisEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
enableCoordinatorClaimExtension
Enables theCoordinatortoextend the claimsof itsWorkPackages.Enabling "coordinator claim extension" is an optimization as it relieves this effort from the
WorkPackage. Toggling this feature may be particularly useful whenever the event handling task of theWorkPackageis lengthy. Either because of a hefty event handling component or because of a largebatchSize(int).An example of a lengthy processing tasks is whenever handling a batch of events exceeds half the
claimTimeoutof theTokenStore. TheclaimTimeoutdefaults to 10 seconds for all durableTokenStoreimplementations.In both scenarios, there's a window of opportunity that the
WorkPackageis not fast enough in extending the claim itself. Not being able to do so potentially causes token stealing by other instances of thisPooledStreamingEventProcessor, thus overburdening the overall event processing task.Note that enabling this feature will result in more frequent invocation of the
TokenStoreto update the tokens.- Returns:
- The current instance, for fluent interfacing.
-
ignoredMessageHandler
public PooledStreamingEventProcessorConfiguration ignoredMessageHandler(Consumer<? super EventMessage> ignoredMessageHandler) Sets the handler, that is invoked when the event is ignored by allWorkPackages thisCoordinatorcontrols. Defaults to a no-op.- Parameters:
ignoredMessageHandler- The handler, that is invoked when the event is ignored by all *WorkPackages thisCoordinatorcontrols.- Returns:
- The current Builder instance, for fluent interfacing.
-
eventCriteria
public PooledStreamingEventProcessorConfiguration eventCriteria(Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider) Sets the function to build theEventCriteriaused to filter events when opening the event source. The function receives the set of supported event types from the assigned EventHandlingComponent.Intention: This function is mainly intended to allow you to specify the tags for filtering or to build more complex criteria. For example, if not all supported event types share the same tag, you may use
EventCriteria.either(EventCriteria...)to construct a disjunction of criteria for different event types and tags. SeeEventCriteriafor advanced usage and examples.By default, it returns
EventCriteria.havingAnyTag().andBeingOneOfTypes(supportedEvents).- Parameters:
eventCriteriaProvider- The function to build theEventCriteriafrom supported event types.- Returns:
- The current instance, for fluent interfacing.
-
schedulingProcessingContextProvider
public PooledStreamingEventProcessorConfiguration schedulingProcessingContextProvider(Supplier<ProcessingContext> schedulingProcessingContextProvider) Provides aProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage. The providedProcessingContextis enriched with resources from theMessageStream.Entryto evaluate whether the event can be handled by this package'sSegment. Currently, the only usage of the context is forEventHandlingComponent.sequenceIdentifierFor(EventMessage, ProcessingContext)execution.- Parameters:
schedulingProcessingContextProvider- TheProcessingContextprovider.- Returns:
- The current Builder instance, for fluent interfacing.
-
addSegmentChangeListener
public PooledStreamingEventProcessorConfiguration addSegmentChangeListener(SegmentChangeListener segmentChangeListener) Adds a listener invoked when segments are claimed or released.- Parameters:
segmentChangeListener- The listener to add.- Returns:
- The current instance, for fluent interfacing.
-
deadLetterQueue
public PooledStreamingEventProcessorConfiguration deadLetterQueue(UnaryOperator<DeadLetterQueueConfiguration> customization) Configures the Dead Letter Queue (DLQ) for this processor using a customization function.The DLQ allows failed events to be stored for later processing or manual inspection. This method accepts a customization function that modifies a
DeadLetterQueueConfiguration.This method supports natural merging with defaults. When combining multiple customizations (e.g., defaults with processor-specific settings), each customization is applied in sequence. Later customizations can override earlier settings while preserving others.
The queue will be automatically wrapped with a
CachingSequencedDeadLetterQueueto optimizecontains()lookups. The cache is cleared when segments are released to ensure consistency.Example usage:
config.deadLetterQueue(dlq -> dlq .queue(InMemorySequencedDeadLetterQueue.defaultQueue()) .enqueuePolicy((letter, cause) -> Decisions.enqueue(cause)) .clearOnReset(false) .cacheMaxSize(2048) )- Parameters:
customization- A function that customizes theDeadLetterQueueConfiguration.- Returns:
- The current instance, for fluent interfacing.
- See Also:
-
validate
Description copied from class:EventProcessorConfigurationValidates whether the fields contained in this Builder are set accordingly.- Overrides:
validatein classEventProcessorConfiguration- Throws:
AxonConfigurationException- if one field is asserted to be incorrect according to the Builder's specifications
-
streaming
public boolean streaming()Description copied from class:EventProcessorConfigurationReturns whether this configuration is for a streaming event processor.- Overrides:
streamingin classEventProcessorConfiguration- Returns:
falsefor basic configuration,truefor streaming configurations.
-
eventSource
Returns theStreamableEventSourceused to track events.- Returns:
- The
StreamableEventSourcefor this processor.
-
tokenStore
Returns theTokenStoreused to store and fetch event tokens.- Returns:
- The
TokenStorefor tracking progress.
-
coordinatorExecutor
Returns the coordinator'sScheduledExecutorService.- Returns:
- The coordinator executor.
-
workerExecutor
Returns the worker'sScheduledExecutorService.- Returns:
- The worker executor.
-
initialSegmentCount
public int initialSegmentCount()Returns the initial segment count used on startup.- Returns:
- The initial segment count.
-
initialToken
Returns the function used to generate initialTrackingTokens.- Returns:
- The initial token generation function.
-
tokenClaimInterval
public long tokenClaimInterval()Returns the token claim interval in milliseconds.- Returns:
- The token claim interval.
-
maxSegmentProvider
Returns theMaxSegmentProviderdefining maximum claimable segments.- Returns:
- The
MaxSegmentProviderfor this processor.
-
claimExtensionThreshold
public long claimExtensionThreshold()Returns the claim extension threshold in milliseconds.- Returns:
- The claim extension threshold.
-
batchSize
public int batchSize()Returns the number of events processed in a single transaction.- Returns:
- The batch size.
-
clock
Returns theClockused for time-dependent operations.- Returns:
- The
Clockfor this processor.
-
coordinatorExtendsClaims
public boolean coordinatorExtendsClaims()Returns whether the coordinator extends claims for work packages.- Returns:
trueif coordinator extends claims,falseotherwise.
-
eventCriteriaProvider
Returns the function to buildEventCriteriafrom supported event types.- Returns:
- The event criteria provider function.
-
interceptors
Returns the list ofEventMessage-specificMessageHandlerInterceptorsto add to thePooledStreamingEventProcessorunder construction with this configuration implementation.- Returns:
- The list of
EventMessage-specificMessageHandlerInterceptorsto add to thePooledStreamingEventProcessorunder construction with this configuration implementation.
-
ignoredMessageHandler
Returns the handler for ignored messages.- Returns:
- The ignored message handler.
-
schedulingProcessingContextProvider
Returns theSupplierproviding theProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.- Returns:
- The
ProcessingContextprovider.
-
segmentChangeListener
Returns the configured segment change listener composed from all registered listeners.- Returns:
- The composed segment change listener.
-
deadLetterQueue
Returns the mergedDeadLetterQueueConfigurationby applying all customizations.This method creates a new configuration instance and applies the accumulated customizations. Use
DeadLetterQueueConfiguration.isEnabled()to check if a queue is configured.- Returns:
- The merged dead letter queue configuration.
-
describeTo
Description copied from interface:DescribableComponentDescribe the properties ofthis DescribableComponentwith the givendescriptor.Components should call the appropriate
describePropertymethods on the descriptor to register their properties. The descriptor is responsible for determining how these properties are formatted and structured in the final output.Best Practices: As a general rule, all relevant fields of a
DescribableComponentimplementation should be described in this method. However, developers have discretion to include only the fields that make sense in the context. Not every field may be meaningful for description purposes, especially internal implementation details. Furthermore, components might want to expose different information based on their current state. The final decision on what properties to include lies with the person implementing thedescribeTomethod, who should focus on providing information that is useful for understanding the component's configuration and state.Example implementation:
public void describeTo(ComponentDescriptor descriptor) { descriptor.describeProperty("name", this.name); descriptor.describeProperty("enabled", this.enabled); descriptor.describeProperty("configuration", this.configuration); // A nested component descriptor.describeProperty("handlers", this.eventHandlers); // A collection }- Specified by:
describeToin interfaceDescribableComponent- Overrides:
describeToin classEventProcessorConfiguration- Parameters:
descriptor- The component descriptor to describethis DescribableComponentn its properties in.
-