public class DisruptorCommandBus extends Object implements CommandBus
Disruptor
,
which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread
communication.
The process is split into two separate steps, each of which is executed in a different thread:
This separation of process steps makes this implementation very efficient and highly performing. However, it does
not cope with exceptions very well. When an exception occurs, an Aggregate that has been loaded is potentially
corrupt. That means that an aggregate does not represent a state that can be reproduced by replaying its committed
events. Although this implementation will recover from this corrupt state, it may result in a number of commands
being rejected in the meantime. These command may be retried if the cause of the AggregateStateCorruptedException
does not indicate a non-transient error.
Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException
exception. These commands are automatically rescheduled for processing by
default. Use DisruptorCommandBus.Builder.rescheduleCommandsOnCorruptState(boolean)
to disable this feature. Note
that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).
Limitations of this implementation
Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.
This implementation can only work with Event Sourced Aggregates. Infrastructure considerations
This CommandBus implementation has special requirements for the Repositories being used during Command Processing.
Therefore, the Repository instance to use in the Command Handler must be created using createRepository(EventStore, AggregateFactory, RepositoryProvider)
.
Using another repository will most likely result in undefined behavior.
The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a recovery process in the case of exceptions.
Consider providing an alternative IdentifierFactory
implementation. The default
implementation used UUID.randomUUID()
to generated identifier for Events. The poor performance of
this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for
example, com.eaio.uuid.UUID
Modifier and Type | Class and Description |
---|---|
static class |
DisruptorCommandBus.Builder
Builder class to instantiate a
DisruptorCommandBus . |
Modifier | Constructor and Description |
---|---|
protected |
DisruptorCommandBus(DisruptorCommandBus.Builder builder)
Instantiate a
DisruptorCommandBus based on the fields contained in the DisruptorCommandBus.Builder . |
Modifier and Type | Method and Description |
---|---|
static DisruptorCommandBus.Builder |
builder()
Instantiate a Builder to be able to create a
DisruptorCommandBus . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and
aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and
aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore , that is
created by the given aggregateFactory . |
<C> void |
dispatch(CommandMessage<C> command)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
<C,R> void |
dispatch(CommandMessage<C> command,
CommandCallback<? super C,? super R> callback)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
Registration |
registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
Register the given DispatchInterceptor.
|
Registration |
registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
Register the given
handlerInterceptor . |
void |
stop()
Shuts down the command bus.
|
Registration |
subscribe(String commandName,
MessageHandler<? super CommandMessage<?>> handler)
Subscribe the given
handler to commands with the given commandName . |
protected DisruptorCommandBus(DisruptorCommandBus.Builder builder)
DisruptorCommandBus
based on the fields contained in the DisruptorCommandBus.Builder
. The Threads
required for command execution are immediately requested from the Configuration's Executor, if any. Otherwise,
they are created.
Will assert that the CommandTargetResolver
, MessageMonitor
, RollbackConfiguration
, ProducerType
, WaitStrategy
and Cache
are not null
. Additional verification is done on
the the coolingDownPeriod
, publisherThreadCount
, bufferSize
and invokerThreadCount
to check whether they are positive numbers. If any of these checks fails, an AxonConfigurationException
will be thrown.
builder
- the DisruptorCommandBus.Builder
used to instantiate a DisruptorCommandBus
instancepublic static DisruptorCommandBus.Builder builder()
DisruptorCommandBus
.
The following configurable fields have defaults:
rescheduleCommandsOnCorruptState
defaults to true
.coolingDownPeriod
defaults to 1000
.CommandTargetResolver
defaults to an AnnotationCommandTargetResolver
.publisherThreadCount
defaults to 1
.MessageMonitor
defaults to NoOpMessageMonitor.INSTANCE
.RollbackConfiguration
defaults to RollbackConfigurationType.UNCHECKED_EXCEPTIONS
.bufferSize
defaults to 4096
.ProducerType
defaults to ProducerType.MULTI
.WaitStrategy
defaults to a BlockingWaitStrategy
.invokerThreadCount
defaults to 1
.Cache
defaults to NoCache.INSTANCE
.DuplicateCommandHandlerResolver
defaults to DuplicateCommandHandlerResolution.logAndOverride()
.Executor
.
The CommandTargetResolver
, MessageMonitor
, RollbackConfiguration
, ProducerType
,
WaitStrategy
and Cache
are a hard requirements. Thus setting them to null
will
result in an AxonConfigurationException
.
Additionally, the coolingDownPeriod
, publisherThreadCount
, bufferSize
and
invokerThreadCount
have a positive number constraint, thus will also result in an
AxonConfigurationException if set otherwise.
DisruptorCommandBus
public <C> void dispatch(@Nonnull CommandMessage<C> command)
CommandBus
command
to the CommandHandler subscribed to the given command
's name. No
feedback is given about the status of the dispatching process. Implementations may return immediately after
asserting a valid handler is registered for the given command.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchcommand
- The Command to dispatchGenericCommandMessage.asCommandMessage(Object)
public <C,R> void dispatch(@Nonnull CommandMessage<C> command, @Nonnull CommandCallback<? super C,? super R> callback)
CommandBus
command
to the CommandHandler subscribed to the given command
's name. When the
command is processed, one of the callback's methods is called, depending on the result of the processing.
There are no guarantees about the successful completion of command dispatching or handling after the method
returns. Implementations are highly recommended to perform basic validation of the command before returning
from this method call.
Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after
a successful or failed execution, respectively.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchR
- The type of the expected resultcommand
- The Command to dispatchcallback
- The callback to invoke when command processing is completeGenericCommandMessage.asCommandMessage(Object)
public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory)
eventStore
and
aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the Aggregatepublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, RepositoryProvider repositoryProvider)
eventStore
and
aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregaterepositoryProvider
- Provides repositories for specified aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
eventStore
, that is
created by the given aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotspublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, RepositoryProvider repositoryProvider)
eventStore
, that is
created by the given aggregateFactory
. The given decorator
is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator
is different.
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotsrepositoryProvider
- Provides repositories for specified aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
aggregateFactory
and sourced from given eventStore
. Parameters of the annotated methods are resolved
using the given parameterResolverFactory
.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
aggregateFactory
and sourced from given eventStore
. Parameters of the annotated methods are resolved
using the given parameterResolverFactory
. The given handlerDefinition
is used to create handler
instances.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition
- The handler definition used to create concrete handlersrepositoryProvider
- Provides specific for given aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
eventStore
, that is
created by the given aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
eventStore
, that is
created by the given aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
. The given handlerDefinition
is used to create handler instances.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition
- The handler definition used to create concrete handlersrepositoryProvider
- Provides repositories for specific aggregate typespublic Registration subscribe(@Nonnull String commandName, @Nonnull MessageHandler<? super CommandMessage<?>> handler)
CommandBus
handler
to commands with the given commandName
.
If a subscription already exists for the given name, the behavior is undefined. Implementations may throw an
Exception to refuse duplicate subscription or alternatively decide whether the existing or new handler
gets the subscription.subscribe
in interface CommandBus
commandName
- The name of the command to subscribe the handler tohandler
- The handler instance that handles the given type of commandhandler
. When unsubscribed it will no longer receive commands.public void stop()
@Nonnull public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
MessageDispatchInterceptorSupport
registerDispatchInterceptor
in interface MessageDispatchInterceptorSupport<CommandMessage<?>>
dispatchInterceptor
- The interceptor to registerpublic Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
MessageHandlerInterceptorSupport
handlerInterceptor
. After registration, the interceptor will be invoked for each
handled Message on the messaging component that it was registered to, prior to invoking the message's handler.registerHandlerInterceptor
in interface MessageHandlerInterceptorSupport<CommandMessage<?>>
handlerInterceptor
- The interceptor to registerCopyright © 2010–2023. All rights reserved.