public class DisruptorCommandBus extends Object implements CommandBus
Disruptor
,
which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread
communication.
The process is split into two separate steps, each of which is executed in a different thread:
This separation of process steps makes this implementation very efficient and highly performing. However, it does
not cope with exceptions very well. When an exception occurs, an Aggregate that has been loaded is potentially
corrupt. That means that an aggregate does not represent a state that can be reproduced by replaying its committed
events. Although this implementation will recover from this corrupt state, it may result in a number of commands
being rejected in the meantime. These command may be retried if the cause of the AggregateStateCorruptedException
does not indicate a non-transient error.
Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException
exception. These commands are automatically rescheduled for processing by
default. Use DisruptorConfiguration.setRescheduleCommandsOnCorruptState(boolean)
disable this feature. Note
that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).
Limitations of this implementation
Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.
This implementation can only work with Event Sourced Aggregates. Infrastructure considerations
This CommandBus implementation has special requirements for the Repositories being used during Command Processing.
Therefore, the Repository instance to use in the Command Handler must be created using createRepository(EventStore, AggregateFactory, RepositoryProvider)
.
Using another repository will most likely result in undefined behavior.
The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a recovery process in the case of exceptions.
Consider providing an alternative IdentifierFactory
implementation. The default
implementation used UUID.randomUUID()
to generated identifier for Events. The poor performance of
this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for
example, com.eaio.uuid.UUID
Constructor and Description |
---|
DisruptorCommandBus()
Initialize the DisruptorCommandBus with given resources, using default configuration settings.
|
DisruptorCommandBus(DisruptorConfiguration configuration)
Initialize the DisruptorCommandBus with given resources and settings.
|
DisruptorCommandBus(EventStore eventStore)
Deprecated.
Use
DisruptorCommandBus() instead |
DisruptorCommandBus(EventStore eventStore,
DisruptorConfiguration configuration)
Deprecated.
Use
instead |
Modifier and Type | Method and Description |
---|---|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory)
Deprecated.
|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Deprecated.
|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
|
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory,
HandlerDefinition handlerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
RepositoryProvider repositoryProvider)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore , that is
created by the given aggregateFactory . |
<C> void |
dispatch(CommandMessage<C> command)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
<C,R> void |
dispatch(CommandMessage<C> command,
CommandCallback<? super C,R> callback)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
Registration |
registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
Register the given DispatchInterceptor.
|
Registration |
registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
Register the given
handlerInterceptor . |
void |
stop()
Shuts down the command bus.
|
Registration |
subscribe(String commandName,
MessageHandler<? super CommandMessage<?>> handler)
Subscribe the given
handler to commands with the given commandName . |
@Deprecated public DisruptorCommandBus(EventStore eventStore)
DisruptorCommandBus()
insteadeventStore
- The EventStore where generated events must be storedpublic DisruptorCommandBus()
public DisruptorCommandBus(DisruptorConfiguration configuration)
configuration
- The configuration for the command bus@Deprecated public DisruptorCommandBus(EventStore eventStore, DisruptorConfiguration configuration)
instead
eventStore
- The EventStore where generated events must be storedconfiguration
- The configuration for the command buspublic <C> void dispatch(CommandMessage<C> command)
CommandBus
command
to the CommandHandler subscribed to the given command
's name.
No feedback is given about the status of the dispatching process. Implementations may return immediately after
asserting a valid handler is registered for the given command.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchcommand
- The Command to dispatchGenericCommandMessage.asCommandMessage(Object)
public <C,R> void dispatch(CommandMessage<C> command, CommandCallback<? super C,R> callback)
CommandBus
command
to the CommandHandler subscribed to the given command
's name.
When the command is processed, one of the callback's methods is called, depending on the result of the processing.
When the method returns, the only guarantee provided by the CommandBus implementation is that the command has
been successfully received. Implementations are highly recommended to perform basic validation of the command
before returning from this method call.
Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after
a successful or failed execution, respectively.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchR
- The type of the expected resultcommand
- The Command to dispatchcallback
- The callback to invoke when command processing is completeGenericCommandMessage.asCommandMessage(Object)
@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory)
createRepository(EventStore, AggregateFactory, RepositoryProvider)
insteadaggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the Aggregatepublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory)
eventStore
and aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the Aggregatepublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, RepositoryProvider repositoryProvider)
eventStore
and aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregaterepositoryProvider
- Provides repositories for specified aggregate types@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
createRepository(EventStore, AggregateFactory, SnapshotTriggerDefinition,
RepositoryProvider)
instead.aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotspublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
eventStore
, that is
created by the given aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotspublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, RepositoryProvider repositoryProvider)
eventStore
, that is
created by the given aggregateFactory
. The given decorator
is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator
is different.
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotsrepositoryProvider
- Provides repositories for specified aggregate types@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
createRepository(EventStore, AggregateFactory, ParameterResolverFactory,
HandlerDefinition, RepositoryProvider)
instead.aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
.T
- The type of aggregate managed by this repositoryaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
aggregateFactory
and sourced from given eventStore
. Parameters of the annotated methods are
resolved using the given parameterResolverFactory
.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
aggregateFactory
and sourced from given eventStore
. Parameters of the annotated methods are
resolved using the given parameterResolverFactory
. The given handlerDefinition
is used to create
handler instances.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition
- The handler definition used to create concrete handlersrepositoryProvider
- Provides specific for given aggregate types@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
createRepository(EventStore, AggregateFactory, SnapshotTriggerDefinition,
ParameterResolverFactory, HandlerDefinition, RepositoryProvider)
insteadaggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
.T
- The type of aggregate managed by this repositoryaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
eventStore
, that is
created by the given aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory, HandlerDefinition handlerDefinition, RepositoryProvider repositoryProvider)
eventStore
, that is
created by the given aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
. The given handlerDefinition
is used to create handler instances.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withhandlerDefinition
- The handler definition used to create concrete handlersrepositoryProvider
- Provides repositories for specific aggregate typespublic Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> handler)
CommandBus
handler
to commands with the given commandName
.
If a subscription already exists for the given name, the behavior is undefined. Implementations may throw an
Exception to refuse duplicate subscription or alternatively decide whether the existing or new
handler
gets the subscription.subscribe
in interface CommandBus
commandName
- The name of the command to subscribe the handler tohandler
- The handler instance that handles the given type of commandhandler
. When unsubscribed it will no longer receive commands.public void stop()
public Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super CommandMessage<?>> dispatchInterceptor)
MessageDispatchInterceptorSupport
registerDispatchInterceptor
in interface MessageDispatchInterceptorSupport<CommandMessage<?>>
dispatchInterceptor
- The interceptor to registerpublic Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super CommandMessage<?>> handlerInterceptor)
MessageHandlerInterceptorSupport
handlerInterceptor
. After registration, the interceptor will be invoked for each
handled Message on the messaging component that it was registered to, prior to invoking the message's handler.registerHandlerInterceptor
in interface MessageHandlerInterceptorSupport<CommandMessage<?>>
handlerInterceptor
- The interceptor to registerCopyright © 2010–2018. All rights reserved.