Class PooledStreamingEventProcessorConfiguration
- All Implemented Interfaces:
DescribableComponent
PooledStreamingEventProcessor.
Upon initialization of this configuration, the following fields are defaulted:
- The
ErrorHandleris defaulted to aPropagatingErrorHandler. - The
initialSegmentCountdefaults to16. - The
initialTokenfunction defaults to aReplayTokenthat starts streaming from thetailwith the replay flag enabled until theheadat the moment of initialization is reached. - The
tokenClaimIntervaldefaults to5000milliseconds. - The
MaxSegmentProvider(used byPooledStreamingEventProcessor.maxCapacity()) defaults toMaxSegmentProvider.maxShort(). - The
claimExtensionThresholddefaults to5000milliseconds. - The
batchSizedefaults to1. - The
Clockdefaults toGenericEventMessage.clock. - The
coordinatorExtendsClaimsdefaults to afalse.
- A
StreamableEventSourceused to retrieve events. - A
TokenStoreto store the progress of this processor in. - A
ScheduledExecutorServiceto coordinate events and segment operations. - A
ScheduledExecutorServiceto process work packages.
- Since:
- 5.0.0
- Author:
- Mateusz Nowak
-
Field Summary
Fields inherited from class org.axonframework.messaging.eventhandling.configuration.EventProcessorConfiguration
errorHandler, interceptors, messageMonitor, unitOfWorkFactory -
Constructor Summary
ConstructorsConstructorDescriptionConstructs a newPooledStreamingEventProcessorConfigurationwith just default values.Constructs a newPooledStreamingEventProcessorConfigurationcopying properties from the given configuration.PooledStreamingEventProcessorConfiguration(EventProcessorConfiguration base, Configuration configuration) Constructs a newPooledStreamingEventProcessorConfigurationwith default values and retrieve global default values. -
Method Summary
Modifier and TypeMethodDescriptionintReturns the number of events processed in a single transaction.batchSize(int batchSize) Specifies the number of events to be processed inside a single transaction.longReturns the claim extension threshold in milliseconds.claimExtensionThreshold(long claimExtensionThreshold) Specifies a time in milliseconds the work packages of this processor should extend the claim on aTrackingToken.clock()Returns theClockused for time-dependent operations.Defines theClockused for time dependent operation by thisEventProcessor.Returns the coordinator'sScheduledExecutorService.coordinatorExecutor(ScheduledExecutorService coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.coordinatorExecutor(Supplier<ScheduledExecutorService> coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.booleanReturns whether the coordinator extends claims for work packages.voiddescribeTo(ComponentDescriptor descriptor) Describe the properties ofthis DescribableComponentwith the givendescriptor.Enables theCoordinatortoextend the claimsof itsWorkPackages.errorHandler(ErrorHandler errorHandler) Sets theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing.eventCriteria(Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider) Sets the function to build theEventCriteriaused to filter events when opening the event source.Returns the function to buildEventCriteriafrom supported event types.Returns theStreamableEventSourceused to track events.eventSource(StreamableEventSource eventSource) Consumer<? super EventMessage> Returns the handler for ignored messages.ignoredMessageHandler(Consumer<? super EventMessage> ignoredMessageHandler) Sets the handler, that is invoked when the event is ignored by allWorkPackages thisCoordinatorcontrols.intReturns the initial segment count used on startup.initialSegmentCount(int initialSegmentCount) Sets the initial segment count used to create segments on start up.Returns the function used to generate initialTrackingTokens.initialToken(Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) Specifies theFunctionused to generate the initialTrackingTokens.maxClaimedSegments(int maxClaimedSegments) Sets the maximum number of segments this instance may claim.Returns theMaxSegmentProviderdefining maximum claimable segments.maxSegmentProvider(MaxSegmentProvider maxSegmentProvider) Defines the maximum number of segment thisStreamingEventProcessormay claim per instance.Returns theSupplierproviding theProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.schedulingProcessingContextProvider(Supplier<ProcessingContext> schedulingProcessingContextProvider) Provides aProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.booleanReturns whether this configuration is for a streaming event processor.longReturns the token claim interval in milliseconds.tokenClaimInterval(long tokenClaimInterval) Specifies the time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing.Returns theTokenStoreused to store and fetch event tokens.tokenStore(TokenStore tokenStore) Sets theTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.unitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) AUnitOfWorkFactorythat spawnsUnitOfWorkused to process an event batch.protected voidvalidate()Validates whether the fields contained in this Builder are set accordingly.withInterceptor(MessageHandlerInterceptor<? super EventMessage> interceptor) Registers the givenEventMessage-specificMessageHandlerInterceptorfor thePooledStreamingEventProcessorunder construction.Returns the worker'sScheduledExecutorService.workerExecutor(ScheduledExecutorService workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.workerExecutor(Supplier<ScheduledExecutorService> workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.Methods inherited from class org.axonframework.messaging.eventhandling.configuration.EventProcessorConfiguration
errorHandler, interceptors, unitOfWorkFactory
-
Constructor Details
-
PooledStreamingEventProcessorConfiguration
Constructs a newPooledStreamingEventProcessorConfigurationwith just default values. Do not retrieve any global default values.This configuration will not have any of the default
MessageHandlerInterceptorsfor events. Please usePooledStreamingEventProcessorConfiguration(EventProcessorConfiguration, Configuration)when those are desired. -
PooledStreamingEventProcessorConfiguration
@Internal public PooledStreamingEventProcessorConfiguration(@Nonnull EventProcessorConfiguration base) Constructs a newPooledStreamingEventProcessorConfigurationcopying properties from the given configuration.- Parameters:
base- TheEventProcessorConfigurationto copy properties from.
-
PooledStreamingEventProcessorConfiguration
@Internal public PooledStreamingEventProcessorConfiguration(@Nonnull EventProcessorConfiguration base, @Nonnull Configuration configuration) Constructs a newPooledStreamingEventProcessorConfigurationwith default values and retrieve global default values.- Parameters:
base- TheEventProcessorConfigurationto copy properties from.configuration- The configuration, used to retrieve global default values, likeMessageHandlerInterceptors, from.
-
-
Method Details
-
errorHandler
Description copied from class:EventProcessorConfigurationSets theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing. Defaults to aPropagatingErrorHandler.- Overrides:
errorHandlerin classEventProcessorConfiguration- Parameters:
errorHandler- theErrorHandlerinvoked when anUnitOfWorkthrows an exception during processing- Returns:
- The current instance, for fluent interfacing.
-
unitOfWorkFactory
public PooledStreamingEventProcessorConfiguration unitOfWorkFactory(@Nonnull UnitOfWorkFactory unitOfWorkFactory) Description copied from class:EventProcessorConfigurationAUnitOfWorkFactorythat spawnsUnitOfWorkused to process an event batch.- Overrides:
unitOfWorkFactoryin classEventProcessorConfiguration- Parameters:
unitOfWorkFactory- AUnitOfWorkFactorythat spawnsUnitOfWork.- Returns:
- The current instance, for fluent interfacing.
-
eventSource
public PooledStreamingEventProcessorConfiguration eventSource(@Nonnull StreamableEventSource eventSource) - Parameters:
eventSource- TheStreamableEventSource(e.g. theEventStore) which thisEventProcessorwill track.- Returns:
- The current instance, for fluent interfacing.
-
withInterceptor
@Nonnull public PooledStreamingEventProcessorConfiguration withInterceptor(@Nonnull MessageHandlerInterceptor<? super EventMessage> interceptor) Registers the givenEventMessage-specificMessageHandlerInterceptorfor thePooledStreamingEventProcessorunder construction.- Parameters:
interceptor- TheEventMessage-specificMessageHandlerInterceptorto register for thePooledStreamingEventProcessorunder construction.- Returns:
- This
PooledStreamingEventProcessorConfiguration, for fluent interfacing.
-
tokenStore
Sets theTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.- Parameters:
tokenStore- TheTokenStoreused to store and fetch event tokens that enable thisEventProcessorto track its progress.- Returns:
- The current instance, for fluent interfacing.
-
coordinatorExecutor
public PooledStreamingEventProcessorConfiguration coordinatorExecutor(@Nonnull ScheduledExecutorService coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.- Parameters:
coordinatorExecutor- TheScheduledExecutorServiceto be used by the coordinator of thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
coordinatorExecutor
public PooledStreamingEventProcessorConfiguration coordinatorExecutor(@Nonnull Supplier<ScheduledExecutorService> coordinatorExecutor) Specifies theScheduledExecutorServiceused by the coordinator of thisPooledStreamingEventProcessor.- Parameters:
coordinatorExecutor- TheScheduledExecutorServiceto be used by the coordinator of thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
workerExecutor
public PooledStreamingEventProcessorConfiguration workerExecutor(@Nonnull ScheduledExecutorService workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor. Note thatworkerExecutor(Supplier)is favored over this method, as it avoids eager initialization of an executor that may be overridden by other components.- Parameters:
workerExecutor- TheScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
workerExecutor
public PooledStreamingEventProcessorConfiguration workerExecutor(@Nonnull Supplier<ScheduledExecutorService> workerExecutor) Specifies theScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Parameters:
workerExecutor- TheScheduledExecutorServiceto be provided to theWorkPackages created by thisPooledStreamingEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
initialSegmentCount
Sets the initial segment count used to create segments on start up. Only used whenever there are no segments stored in the configuredTokenStoreupon start up of thisStreamingEventProcessor. The given value should at least be1. Defaults to16.- Parameters:
initialSegmentCount- Theintspecifying the initial segment count used to create segments on startup.- Returns:
- The current instance, for fluent interfacing.
-
initialToken
public PooledStreamingEventProcessorConfiguration initialToken(@Nonnull Function<TrackingTokenSource, CompletableFuture<TrackingToken>> initialToken) Specifies theFunctionused to generate the initialTrackingTokens. The function will be given the configuredStreamableEventSourceso that its methods can be invoked for token creation.Defaults to an automatic replay since the start of the stream.
More specifically, it defaults to a
ReplayTokenthat starts streaming from thetailwith the replay flag enabled until theheadat the moment of initialization is reached.- Parameters:
initialToken- TheFunctiongenerating the initialTrackingTokenbased on a givenStreamableEventSource.- Returns:
- The current instance, for fluent interfacing.
-
tokenClaimInterval
Specifies the time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing. Generally, this means all segments are claimed. Defaults to5000milliseconds.- Parameters:
tokenClaimInterval- The time in milliseconds the processor's coordinator should wait after a failed attempt to claim any segments for processing.- Returns:
- The current instance, for fluent interfacing.
-
maxClaimedSegments
Sets the maximum number of segments this instance may claim.- Parameters:
maxClaimedSegments- The maximum number of segments this instance may claim.- Returns:
- The current instance, for fluent interfacing.
-
maxSegmentProvider
public PooledStreamingEventProcessorConfiguration maxSegmentProvider(@Nonnull MaxSegmentProvider maxSegmentProvider) Defines the maximum number of segment thisStreamingEventProcessormay claim per instance. Defaults toMaxSegmentProvider.maxShort().- Parameters:
maxSegmentProvider- AMaxSegmentProviderproviding the maximum number segments thisStreamingEventProcessormay claim per instance.- Returns:
- The current instance, for fluent interfacing.
-
claimExtensionThreshold
public PooledStreamingEventProcessorConfiguration claimExtensionThreshold(long claimExtensionThreshold) Specifies a time in milliseconds the work packages of this processor should extend the claim on aTrackingToken. The threshold will only be met in absence of regular event processing, since that update theTrackingTokenautomatically. Defaults to5000milliseconds.- Parameters:
claimExtensionThreshold- The time in milliseconds the work packages of this processor should extend the claim on aTrackingToken.- Returns:
- The current instance, for fluent interfacing
-
batchSize
Specifies the number of events to be processed inside a single transaction. Defaults to a batch size of1.Increasing this value with increase the processing speed dramatically, but requires certainty that the operations performed during event handling can be rolled back.
- Parameters:
batchSize- The number of events to be processed inside a single transaction.- Returns:
- The current instance, for fluent interfacing.
-
clock
Defines theClockused for time dependent operation by thisEventProcessor. Used by theCoordinatorandWorkPackagethreads to decide when to perform certain tasks, like updatingTrackingTokenclaims or when to unmark aSegmentas "unclaimable". Defaults toGenericEventMessage.clock.- Parameters:
clock- TheClockused for time dependent operation by thisEventProcessor.- Returns:
- The current instance, for fluent interfacing.
-
enableCoordinatorClaimExtension
Enables theCoordinatortoextend the claimsof itsWorkPackages.Enabling "coordinator claim extension" is an optimization as it relieves this effort from the
WorkPackage. Toggling this feature may be particularly useful whenever the event handling task of theWorkPackageis lengthy. Either because of a hefty event handling component or because of a largebatchSize(int).An example of a lengthy processing tasks is whenever handling a batch of events exceeds half the
claimTimeoutof theTokenStore. TheclaimTimeoutdefaults to 10 seconds for all durableTokenStoreimplementations.In both scenarios, there's a window of opportunity that the
WorkPackageis not fast enough in extending the claim itself. Not being able to do so potentially causes token stealing by other instances of thisPooledStreamingEventProcessor, thus overburdening the overall event processing task.Note that enabling this feature will result in more frequent invocation of the
TokenStoreto update the tokens.- Returns:
- The current instance, for fluent interfacing.
-
ignoredMessageHandler
public PooledStreamingEventProcessorConfiguration ignoredMessageHandler(Consumer<? super EventMessage> ignoredMessageHandler) Sets the handler, that is invoked when the event is ignored by allWorkPackages thisCoordinatorcontrols. Defaults to a no-op.- Parameters:
ignoredMessageHandler- The handler, that is invoked when the event is ignored by all *WorkPackages thisCoordinatorcontrols.- Returns:
- The current Builder instance, for fluent interfacing.
-
eventCriteria
public PooledStreamingEventProcessorConfiguration eventCriteria(@Nonnull Function<Set<QualifiedName>, EventCriteria> eventCriteriaProvider) Sets the function to build theEventCriteriaused to filter events when opening the event source. The function receives the set of supported event types from the assigned EventHandlingComponent.Intention: This function is mainly intended to allow you to specify the tags for filtering or to build more complex criteria. For example, if not all supported event types share the same tag, you may use
EventCriteria.either(EventCriteria...)to construct a disjunction of criteria for different event types and tags. SeeEventCriteriafor advanced usage and examples.By default, it returns
EventCriteria.havingAnyTag().andBeingOneOfTypes(supportedEvents).- Parameters:
eventCriteriaProvider- The function to build theEventCriteriafrom supported event types.- Returns:
- The current instance, for fluent interfacing.
-
schedulingProcessingContextProvider
public PooledStreamingEventProcessorConfiguration schedulingProcessingContextProvider(@Nonnull Supplier<ProcessingContext> schedulingProcessingContextProvider) Provides aProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage. The providedProcessingContextis enriched with resources from theMessageStream.Entryto evaluate whether the event can be handled by this package'sSegment. Currently, the only usage of the context is forEventHandlingComponent.sequenceIdentifierFor(EventMessage, ProcessingContext)execution.- Parameters:
schedulingProcessingContextProvider- TheProcessingContextprovider.- Returns:
- The current Builder instance, for fluent interfacing.
-
validate
Description copied from class:EventProcessorConfigurationValidates whether the fields contained in this Builder are set accordingly.- Overrides:
validatein classEventProcessorConfiguration- Throws:
AxonConfigurationException- if one field is asserted to be incorrect according to the Builder's specifications
-
streaming
public boolean streaming()Description copied from class:EventProcessorConfigurationReturns whether this configuration is for a streaming event processor.- Overrides:
streamingin classEventProcessorConfiguration- Returns:
falsefor basic configuration,truefor streaming configurations.
-
eventSource
Returns theStreamableEventSourceused to track events.- Returns:
- The
StreamableEventSourcefor this processor.
-
tokenStore
Returns theTokenStoreused to store and fetch event tokens.- Returns:
- The
TokenStorefor tracking progress.
-
coordinatorExecutor
Returns the coordinator'sScheduledExecutorService.- Returns:
- The coordinator executor.
-
workerExecutor
Returns the worker'sScheduledExecutorService.- Returns:
- The worker executor.
-
initialSegmentCount
public int initialSegmentCount()Returns the initial segment count used on startup.- Returns:
- The initial segment count.
-
initialToken
Returns the function used to generate initialTrackingTokens.- Returns:
- The initial token generation function.
-
tokenClaimInterval
public long tokenClaimInterval()Returns the token claim interval in milliseconds.- Returns:
- The token claim interval.
-
maxSegmentProvider
Returns theMaxSegmentProviderdefining maximum claimable segments.- Returns:
- The
MaxSegmentProviderfor this processor.
-
claimExtensionThreshold
public long claimExtensionThreshold()Returns the claim extension threshold in milliseconds.- Returns:
- The claim extension threshold.
-
batchSize
public int batchSize()Returns the number of events processed in a single transaction.- Returns:
- The batch size.
-
clock
Returns theClockused for time-dependent operations.- Returns:
- The
Clockfor this processor.
-
coordinatorExtendsClaims
public boolean coordinatorExtendsClaims()Returns whether the coordinator extends claims for work packages.- Returns:
trueif coordinator extends claims,falseotherwise.
-
eventCriteriaProvider
Returns the function to buildEventCriteriafrom supported event types.- Returns:
- The event criteria provider function.
-
ignoredMessageHandler
Returns the handler for ignored messages.- Returns:
- The ignored message handler.
-
schedulingProcessingContextProvider
Returns theSupplierproviding theProcessingContextused to evaluate whether an event can be scheduled for processing by thisWorkPackage.- Returns:
- The
ProcessingContextprovider.
-
describeTo
Description copied from interface:DescribableComponentDescribe the properties ofthis DescribableComponentwith the givendescriptor.Components should call the appropriate
describePropertymethods on the descriptor to register their properties. The descriptor is responsible for determining how these properties are formatted and structured in the final output.Best Practices: As a general rule, all relevant fields of a
DescribableComponentimplementation should be described in this method. However, developers have discretion to include only the fields that make sense in the context. Not every field may be meaningful for description purposes, especially internal implementation details. Furthermore, components might want to expose different information based on their current state. The final decision on what properties to include lies with the person implementing thedescribeTomethod, who should focus on providing information that is useful for understanding the component's configuration and state.Example implementation:
public void describeTo(ComponentDescriptor descriptor) { descriptor.describeProperty("name", this.name); descriptor.describeProperty("enabled", this.enabled); descriptor.describeProperty("configuration", this.configuration); // A nested component descriptor.describeProperty("handlers", this.eventHandlers); // A collection }- Specified by:
describeToin interfaceDescribableComponent- Overrides:
describeToin classEventProcessorConfiguration- Parameters:
descriptor- The component descriptor to describethis DescribableComponentn its properties in.
-