|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |
java.lang.Object org.axonframework.commandhandling.disruptor.DisruptorCommandBus
public class DisruptorCommandBus
Asynchronous CommandBus implementation with very high performance characteristics. It divides the command handling
process in two steps, which can be executed in different threads. The CommandBus is backed by a Disruptor
,
which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread
communication.
AggregateStateCorruptedException
does not indicate a non-transient error.
Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException
exception. These commands are automatically rescheduled for processing by
default. Use DisruptorConfiguration.setRescheduleCommandsOnCorruptState(boolean)
disable this feature. Note
that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).
Limitations of this implementation
Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity
hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.
This implementation can only work with Event Sourced Aggregates.
Infrastructure considerations
This CommandBus implementation has special requirements for the Repositories being used during Command Processing.
Therefore, the Repository instance to use in the Command Handler must be created using createRepository(org.axonframework.eventsourcing.AggregateFactory)
.
Using another repository will most likely result in undefined behavior.
The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the
DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a
recovery process in the case of exceptions.
Consider providing an alternative IdentifierFactory
implementation. The default
implementation used UUID.randomUUID()
to generated identifier for Events. The poor performance of
this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for
example, com.eaio.uuid.UUID
Constructor Summary | |
---|---|
DisruptorCommandBus(EventStore eventStore,
EventBus eventBus)
Initialize the DisruptorCommandBus with given resources, using default configuration settings. |
|
DisruptorCommandBus(EventStore eventStore,
EventBus eventBus,
DisruptorConfiguration configuration)
Initialize the DisruptorCommandBus with given resources and settings. |
Method Summary | ||
---|---|---|
|
createRepository(AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory . |
|
|
createRepository(AggregateFactory<T> aggregateFactory,
EventStreamDecorator decorator)
Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory . |
|
void |
dispatch(CommandMessage<?> command)
Dispatch the given command to the CommandHandler subscribed to that type of command . |
|
|
dispatch(CommandMessage<?> command,
CommandCallback<R> callback)
Dispatch the given command to the CommandHandler subscribed to that type of command . |
|
|
doDispatch(CommandMessage command,
CommandCallback<R> callback)
Forces a dispatch of a command. |
|
void |
stop()
Shuts down the command bus. |
|
|
subscribe(String commandName,
CommandHandler<? super C> handler)
Subscribe the given handler to commands of type commandType . |
|
|
unsubscribe(String commandName,
CommandHandler<? super C> handler)
Unsubscribe the given handler to commands of type commandType . |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
Constructor Detail |
---|
public DisruptorCommandBus(EventStore eventStore, EventBus eventBus)
eventStore
- The EventStore where generated events must be storedeventBus
- The EventBus where generated events must be publishedpublic DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorConfiguration configuration)
eventStore
- The EventStore where generated events must be storedeventBus
- The EventBus where generated events must be publishedconfiguration
- The configuration for the command busMethod Detail |
---|
public void dispatch(CommandMessage<?> command)
CommandBus
command
to the CommandHandler subscribed to that type of command
.
No
feedback is given about the status of the dispatching process. Implementations may return immediately after
asserting a valid handler is registered for the given command.
dispatch
in interface CommandBus
command
- The Command to dispatchGenericCommandMessage.asCommandMessage(Object)
public <R> void dispatch(CommandMessage<?> command, CommandCallback<R> callback)
CommandBus
command
to the CommandHandler subscribed to that type of command
.
When the command is processed, on of the callback methods is called, depending on the result of the processing.
When the method returns, the only guarantee provided by the CommandBus implementation, is that the command has
been successfully received. Implementations are highly recommended to perform basic validation of the command
before returning from this method call.
Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after
a successful or failed execution, respectively.
dispatch
in interface CommandBus
R
- The type of the expected resultcommand
- The Command to dispatchcallback
- The callback to invoke when command processing is completeGenericCommandMessage.asCommandMessage(Object)
public <R> void doDispatch(CommandMessage command, CommandCallback<R> callback)
R
- The expected return type of the commandcommand
- The command to dispatchcallback
- The callback to notify when command handling is completedpublic <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory)
aggregateFactory
.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate
instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the Aggregate
public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory, EventStreamDecorator decorator)
aggregateFactory
. The given decorator
is used to decorate event streams.
The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate
instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).
Note that a second invocation of this method with an aggregate factory for the same aggregate type may
return the same instance as the first invocation, even if the given decorator
is different.
T
- The type of aggregate to create the repository foraggregateFactory
- The factory creating uninitialized instances of the Aggregatedecorator
- The decorator to decorate events streams with
public <C> void subscribe(String commandName, CommandHandler<? super C> handler)
CommandBus
handler
to commands of type commandType
.
If a subscription already exists for the given type, the behavior is undefined. Implementations may throw an
Exception to refuse duplicate subscription or alternatively decide whether the existing or new
handler
gets the subscription.
subscribe
in interface CommandBus
C
- The Type of commandcommandName
- The name of the command to subscribe the handler tohandler
- The handler instance that handles the given type of commandpublic <C> boolean unsubscribe(String commandName, CommandHandler<? super C> handler)
CommandBus
handler
to commands of type commandType
. If the handler is not
currently assigned to that type of command, no action is taken.
unsubscribe
in interface CommandBus
C
- The Type of commandcommandName
- The name of the command the handler is subscribed tohandler
- The handler instance to unsubscribe from the CommandBus
true
of this handler is successfully unsubscribed, false
of the given
handler
was not the current handler for given commandType
.public void stop()
|
||||||||||
PREV CLASS NEXT CLASS | FRAMES NO FRAMES | |||||||||
SUMMARY: NESTED | FIELD | CONSTR | METHOD | DETAIL: FIELD | CONSTR | METHOD |