public class DisruptorCommandBus extends Object implements CommandBus
Disruptor
,
which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread
communication.
The process is split into two separate steps, each of which is executed in a different thread:
Exceptions and recovery
This separation of process steps makes this implementation very efficient and highly performing. However, it does
not cope with exceptions very well. When an exception occurs, an Aggregate that has been loaded is potentially
corrupt. That means that an aggregate does not represent a state that can be reproduced by replaying its committed
events. Although this implementation will recover from this corrupt state, it may result in a number of commands
being rejected in the meantime. These command may be retried if the cause of the AggregateStateCorruptedException
does not indicate a non-transient error.
Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException
exception. These commands are automatically rescheduled for processing by
default. Use DisruptorConfiguration.setRescheduleCommandsOnCorruptState(boolean)
disable this feature. Note
that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).
Limitations of this implementation
Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.
This implementation can only work with Event Sourced Aggregates.
Infrastructure considerations
This CommandBus implementation has special requirements for the Repositories being used during Command Processing.
Therefore, the Repository instance to use in the Command Handler must be created using createRepository(EventStore, AggregateFactory)
.
Using another repository will most likely result in undefined behavior.
The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a recovery process in the case of exceptions.
Consider providing an alternative IdentifierFactory
implementation. The default
implementation used UUID.randomUUID()
to generated identifier for Events. The poor performance of
this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for
example, com.eaio.uuid.UUID
Constructor and Description |
---|
DisruptorCommandBus()
Initialize the DisruptorCommandBus with given resources, using default configuration settings.
|
DisruptorCommandBus(DisruptorConfiguration configuration)
Initialize the DisruptorCommandBus with given resources and settings.
|
DisruptorCommandBus(EventStore eventStore)
Deprecated.
Use
DisruptorCommandBus() instead |
DisruptorCommandBus(EventStore eventStore,
DisruptorConfiguration configuration)
Deprecated.
Use
instead |
Modifier and Type | Method and Description |
---|---|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory)
Deprecated.
Use
createRepository(EventStore, AggregateFactory) instead |
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
Deprecated.
|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Deprecated.
|
<T> Repository<T> |
createRepository(AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
|
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
eventStore and aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given
aggregateFactory and sourced from given eventStore . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition)
Creates a repository instance for an Event Sourced aggregate, source from given
eventStore , that is
created by the given aggregateFactory . |
<T> Repository<T> |
createRepository(EventStore eventStore,
AggregateFactory<T> aggregateFactory,
SnapshotTriggerDefinition snapshotTriggerDefinition,
ParameterResolverFactory parameterResolverFactory)
Creates a repository instance for an Event Sourced aggregate, sourced from given
eventStore , that is
created by the given aggregateFactory . |
<C> void |
dispatch(CommandMessage<C> command)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
<C,R> void |
dispatch(CommandMessage<C> command,
CommandCallback<? super C,R> callback)
Dispatch the given
command to the CommandHandler subscribed to the given command 's name. |
void |
stop()
Shuts down the command bus.
|
Registration |
subscribe(String commandName,
MessageHandler<? super CommandMessage<?>> handler)
Subscribe the given
handler to commands with the given commandName . |
@Deprecated public DisruptorCommandBus(EventStore eventStore)
DisruptorCommandBus()
insteadeventStore
- The EventStore where generated events must be storedpublic DisruptorCommandBus()
public DisruptorCommandBus(DisruptorConfiguration configuration)
configuration
- The configuration for the command bus@Deprecated public DisruptorCommandBus(EventStore eventStore, DisruptorConfiguration configuration)
instead
eventStore
- The EventStore where generated events must be storedconfiguration
- The configuration for the command buspublic <C> void dispatch(CommandMessage<C> command)
CommandBus
command
to the CommandHandler subscribed to the given command
's name.
No feedback is given about the status of the dispatching process. Implementations may return immediately after
asserting a valid handler is registered for the given command.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchcommand
- The Command to dispatchGenericCommandMessage.asCommandMessage(Object)
public <C,R> void dispatch(CommandMessage<C> command, CommandCallback<? super C,R> callback)
CommandBus
command
to the CommandHandler subscribed to the given command
's name.
When the command is processed, one of the callback's methods is called, depending on the result of the processing.
When the method returns, the only guarantee provided by the CommandBus implementation is that the command has
been successfully received. Implementations are highly recommended to perform basic validation of the command
before returning from this method call.
Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after
a successful or failed execution, respectively.dispatch
in interface CommandBus
C
- The payload type of the command to dispatchR
- The type of the expected resultcommand
- The Command to dispatchcallback
- The callback to invoke when command processing is completeGenericCommandMessage.asCommandMessage(Object)
@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory)
createRepository(EventStore, AggregateFactory)
insteadaggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the Aggregatepublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory)
eventStore
and aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the Aggregate@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
createRepository(EventStore, AggregateFactory, SnapshotTriggerDefinition)
instead.aggregateFactory
. The given decorator
is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator
is different.
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshotspublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition)
eventStore
, that is
created by the given aggregateFactory
. The given decorator
is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator
is different.
T
- The type of aggregate to create the repository foreventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for creating snapshots@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
createRepository(EventStore, AggregateFactory, ParameterResolverFactory)
instead.aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
.T
- The type of aggregate managed by this repositoryaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, ParameterResolverFactory parameterResolverFactory)
aggregateFactory
and sourced from given eventStore
. Parameters of the annotated methods are
resolved using the given parameterResolverFactory
.T
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregateparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
with@Deprecated public <T> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
createRepository(EventStore, AggregateFactory, SnapshotTriggerDefinition, ParameterResolverFactory)
insteadaggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
. The given decorator
is used to intercept incoming streams of eventsT
- The type of aggregate managed by this repositoryaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic <T> Repository<T> createRepository(EventStore eventStore, AggregateFactory<T> aggregateFactory, SnapshotTriggerDefinition snapshotTriggerDefinition, ParameterResolverFactory parameterResolverFactory)
eventStore
, that is
created by the given aggregateFactory
. Parameters of the annotated methods are resolved using the given
parameterResolverFactory
. The given decorator
is used to intercept incoming streams of eventsT
- The type of aggregate managed by this repositoryeventStore
- The Event Store to retrieve and persist eventsaggregateFactory
- The factory creating uninitialized instances of the AggregatesnapshotTriggerDefinition
- The trigger definition for snapshotsparameterResolverFactory
- The ParameterResolverFactory to resolve parameter values of annotated handler
withpublic Registration subscribe(String commandName, MessageHandler<? super CommandMessage<?>> handler)
CommandBus
handler
to commands with the given commandName
.
If a subscription already exists for the given name, the behavior is undefined. Implementations may throw an
Exception to refuse duplicate subscription or alternatively decide whether the existing or new
handler
gets the subscription.subscribe
in interface CommandBus
commandName
- The name of the command to subscribe the handler tohandler
- The handler instance that handles the given type of commandhandler
. When unsubscribed it will no longer receive commands.public void stop()
Copyright © 2010–2018. All rights reserved.