public class DisruptorCommandBus extends Object implements CommandBus
Disruptor,
which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread
communication.
The process is split into two separate steps, each of which is executed in a different thread:
This separation of process steps makes this implementation very efficient and highly performing. However, it does
not cope with exceptions very well. When an exception occurs, an Aggregate that has been loaded is potentially
corrupt. That means that an aggregate does not represent a state that can be reproduced by replaying its committed
events. Although this implementation will recover from this corrupt state, it may result in a number of commands
being rejected in the meantime. These command may be retried if the cause of the AggregateStateCorruptedException does not indicate a non-transient error.
Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException exception. These commands are automatically rescheduled for processing by
default. Use DisruptorCommandBus.Builder.rescheduleCommandsOnCorruptState(boolean) to disable this feature. Note
that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).
Limitations of this implementation
Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.
This implementation can only work with Event Sourced Aggregates. Infrastructure considerations
This CommandBus implementation has special requirements for the Repositories being used during Command Processing.
Therefore, the Repository instance to use in the Command Handler must be created using createRepository(EventStore, AggregateFactory, RepositoryProvider).
Using another repository will most likely result in undefined behavior.
The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a recovery process in the case of exceptions.
Consider providing an alternative IdentifierFactory implementation. The default
implementation used UUID.randomUUID() to generated identifier for Events. The poor performance of
this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for
example, com.eaio.uuid.UUID
| Modifier and Type | Class and Description |
|---|---|
static class |
DisruptorCommandBus.Builder
Builder class to instantiate a
DisruptorCommandBus. |
| Modifier | Constructor and Description |
|---|---|
protected |
DisruptorCommandBus(DisruptorCommandBus.Builder builder)
Instantiate a
DisruptorCommandBus based on the fields contained in the DisruptorCommandBus.Builder. |
| Modifier and Type | Method and Description |
|---|---|
static DisruptorCommandBus.Builder |
builder()
Instantiate a Builder to be able to create a
DisruptorCommandBus. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and
aggregateFactory. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and
aggregateFactory. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore, that is
created by the given aggregateFactory. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore, that is
created by the given aggregateFactory. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore, that is
created by the given aggregateFactory. |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore, that is
created by the given aggregateFactory. |
<C> void |
dispatch(CommandMessage<C> command)
Dispatch the given
command to the CommandHandler subscribed to the given command's name. |
<C,R> void |
dispatch(CommandMessage<C> command,
CommandCallback<? super C,? super R> callback)
Dispatch the given
command to the CommandHandler subscribed to the given command's name. |
Registration |
registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
Register the given DispatchInterceptor.
|
Registration |
registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
Register the given
handlerInterceptor. |
void |
stop()
Shuts down the command bus.
|
Registration |
subscribe(String commandName,
MessageHandler<? super CommandMessage<?>> handler)
Subscribe the given
handler to commands with the given commandName. |
protected DisruptorCommandBus(DisruptorCommandBus.Builder builder)
DisruptorCommandBus based on the fields contained in the DisruptorCommandBus.Builder. The Threads
required for command execution are immediately requested from the Configuration's Executor, if any. Otherwise,
they are created.
Will assert that the CommandTargetResolver, MessageMonitor, RollbackConfiguration, ProducerType, WaitStrategy and Cache are not null. Additional verification is done on
the the coolingDownPeriod, publisherThreadCount, bufferSize and invokerThreadCount to check whether they are positive numbers. If any of these checks fails, an AxonConfigurationException will be thrown.
builder - the DisruptorCommandBus.Builder used to instantiate a DisruptorCommandBus instancepublic static DisruptorCommandBus.Builder builder()
DisruptorCommandBus.
The following configurable fields have defaults:
rescheduleCommandsOnCorruptState defaults to true.coolingDownPeriod defaults to 1000.CommandTargetResolver defaults to an AnnotationCommandTargetResolver.publisherThreadCount defaults to 1.MessageMonitor defaults to NoOpMessageMonitor.INSTANCE.RollbackConfiguration defaults to RollbackConfigurationType.UNCHECKED_EXCEPTIONS.bufferSize defaults to 4096.ProducerType defaults to ProducerType.MULTI.WaitStrategy defaults to a BlockingWaitStrategy.invokerThreadCount defaults to 1.Cache defaults to NoCache.INSTANCE.DuplicateCommandHandlerResolver defaults to DuplicateCommandHandlerResolution.logAndOverride().Executor.
The CommandTargetResolver, MessageMonitor, RollbackConfiguration, ProducerType,
WaitStrategy and Cache are a hard requirements. Thus setting them to null will
result in an AxonConfigurationException.
Additionally, the coolingDownPeriod, publisherThreadCount, bufferSize and
invokerThreadCount have a positive number constraint, thus will also result in an
AxonConfigurationException if set otherwise.
DisruptorCommandBuspublic <C> void dispatch(@Nonnull CommandMessage<C> command)
CommandBuscommand to the CommandHandler subscribed to the given command's name. No
feedback is given about the status of the dispatching process. Implementations may return immediately after
asserting a valid handler is registered for the given command.dispatch in interface CommandBusC - The payload type of the command to dispatchcommand - The Command to dispatchGenericCommandMessage.asCommandMessage(Object)public <C,R> void dispatch(@Nonnull CommandMessage<C> command, @Nonnull CommandCallback<? super C,? super R> callback)
CommandBuscommand to the CommandHandler subscribed to the given command's name. When the
command is processed, one of the callback's methods is called, depending on the result of the processing.
There are no guarantees about the successful completion of command dispatching or handling after the method
returns. Implementations are highly recommended to perform basic validation of the command before returning
from this method call.
Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after
a successful or failed execution, respectively.dispatch in interface CommandBusC - The payload type of the command to dispatchR - The type of the expected resultcommand - The Command to dispatchcallback - The callback to invoke when command processing is completeGenericCommandMessage.asCommandMessage(Object)public <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory)
eventStore and
aggregateFactory.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T - The type of aggregate to create the repository foreventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the Aggregatepublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, RepositoryProvider repositoryProvider)
eventStore and
aggregateFactory.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T - The type of aggregate to create the repository foreventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregaterepositoryProvider - Provides repositories for specified aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
eventStore, that is
created by the given aggregateFactory.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T - The type of aggregate to create the repository foreventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition - The trigger definition for creating snapshotspublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, RepositoryProvider repositoryProvider)
eventStore, that is
created by the given aggregateFactory. The given decorator is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator is different.
T - The type of aggregate to create the repository foreventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition - The trigger definition for creating snapshotsrepositoryProvider - Provides repositories for specified aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
aggregateFactory and sourced from given eventStore. Parameters of the annotated methods are resolved
using the given parameterResolverFactory.T - The type of aggregate managed by this repositoryeventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregateparameterResolverFactory - The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
aggregateFactory and sourced from given eventStore. Parameters of the annotated methods are resolved
using the given parameterResolverFactory. The given handlerDefinition is used to create handler
instances.T - The type of aggregate managed by this repositoryeventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregateparameterResolverFactory - The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition - The handler definition used to create concrete handlersrepositoryProvider - Provides specific for given aggregate typespublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
eventStore, that is
created by the given aggregateFactory. Parameters of the annotated methods are resolved using the given
parameterResolverFactory.T - The type of aggregate managed by this repositoryeventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition - The trigger definition for snapshotsparameterResolverFactory - The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
eventStore, that is
created by the given aggregateFactory. Parameters of the annotated methods are resolved using the given
parameterResolverFactory. The given handlerDefinition is used to create handler instances.T - The type of aggregate managed by this repositoryeventStore - The Event Store to retrieve and persist eventsaggregateFactory - The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition - The trigger definition for snapshotsparameterResolverFactory - The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition - The handler definition used to create concrete handlersrepositoryProvider - Provides repositories for specific aggregate typespublic Registration subscribe(@Nonnull String commandName, @Nonnull MessageHandler<? super CommandMessage<?>> handler)
CommandBushandler to commands with the given commandName.
If a subscription already exists for the given name, the behavior is undefined. Implementations may throw an
Exception to refuse duplicate subscription or alternatively decide whether the existing or new handler
gets the subscription.subscribe in interface CommandBuscommandName - The name of the command to subscribe the handler tohandler - The handler instance that handles the given type of commandhandler. When unsubscribed it will no longer receive commands.public void stop()
@Nonnull public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
MessageDispatchInterceptorSupportregisterDispatchInterceptor in interface MessageDispatchInterceptorSupport<CommandMessage<?>>dispatchInterceptor - The interceptor to registerpublic Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
MessageHandlerInterceptorSupporthandlerInterceptor. After registration, the interceptor will be invoked for each
handled Message on the messaging component that it was registered to, prior to invoking the message's handler.registerHandlerInterceptor in interface MessageHandlerInterceptorSupport<CommandMessage<?>>handlerInterceptor - The interceptor to registerCopyright © 2010–2023. All rights reserved.