org.axonframework.commandhandling.disruptor
Class DisruptorCommandBus

java.lang.Object
  extended by org.axonframework.commandhandling.disruptor.DisruptorCommandBus
All Implemented Interfaces:
CommandBus

public class DisruptorCommandBus
extends Object
implements CommandBus

Asynchronous CommandBus implementation with very high performance characteristics. It divides the command handling process in two steps, which can be executed in different threads. The CommandBus is backed by a Disruptor, which ensures that two steps are executed sequentially in these threads, while minimizing locking and inter-thread communication.

The process is split into two separate steps, each of which is executed in a different thread:

  1. Command Handler execution
    This process invokes the command handler with the incoming command. The result and changes to the aggregate are recorded for the next step.
  2. Event storage and publication
    This process stores all generated domain events and publishes them (with any optional application events) to the event bus. Finally, an asynchronous task is scheduled to invoke the command handler callback with the result of the command handling result.

Exceptions and recovery

This separation of process steps makes this implementation very efficient and highly performing. However, it does not cope with exceptions very well. When an exception occurs, an Aggregate that has been loaded is potentially corrupt. That means that an aggregate does not represent a state that can be reproduced by replaying its committed events. Although this implementation will recover from this corrupt state, it may result in a number of commands being rejected in the meantime. These command may be retried if the cause of the AggregateStateCorruptedException does not indicate a non-transient error.

Commands that have been executed against a potentially corrupt Aggregate will result in a AggregateStateCorruptedException exception. These commands are automatically rescheduled for processing by default. Use DisruptorConfiguration.setRescheduleCommandsOnCorruptState(boolean) disable this feature. Note that the order in which commands are executed is not fully guaranteed when this feature is enabled (default).

Limitations of this implementation

Although this implementation allows applications to achieve extreme performance (over 1M commands on commodity hardware), it does have some limitations. It only allows a single aggregate to be invoked during command processing.

This implementation can only work with Event Sourced Aggregates.

Infrastructure considerations

This CommandBus implementation has special requirements for the Repositories being used during Command Processing. Therefore, the Repository instance to use in the Command Handler must be created using createRepository(org.axonframework.eventsourcing.AggregateFactory). Using another repository will most likely result in undefined behavior.

The DisruptorCommandBus must have access to at least 3 threads, two of which are permanently used while the DisruptorCommandBus is operational. At least one additional thread is required to invoke callbacks and initiate a recovery process in the case of exceptions.

Consider providing an alternative IdentifierFactory implementation. The default implementation used UUID.randomUUID() to generated identifier for Events. The poor performance of this method severely impacts overall performance of the DisruptorCommandBus. A better performing alternative is, for example, com.eaio.uuid.UUID

Since:
2.0
Author:
Allard Buijze

Constructor Summary
DisruptorCommandBus(EventStore eventStore, EventBus eventBus)
          Initialize the DisruptorCommandBus with given resources, using default configuration settings.
DisruptorCommandBus(EventStore eventStore, EventBus eventBus, DisruptorConfiguration configuration)
          Initialize the DisruptorCommandBus with given resources and settings.
 
Method Summary
<T extends EventSourcedAggregateRoot>
Repository<T>
createRepository(AggregateFactory<T> aggregateFactory)
          Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory.
<T extends EventSourcedAggregateRoot>
Repository<T>
createRepository(AggregateFactory<T> aggregateFactory, EventStreamDecorator decorator)
          Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory.
 void dispatch(CommandMessage<?> command)
          Dispatch the given command to the CommandHandler subscribed to that type of command.
<R> void
dispatch(CommandMessage<?> command, CommandCallback<R> callback)
          Dispatch the given command to the CommandHandler subscribed to that type of command.
<R> void
doDispatch(CommandMessage command, CommandCallback<R> callback)
          Forces a dispatch of a command.
 void stop()
          Shuts down the command bus.
<C> void
subscribe(String commandName, CommandHandler<? super C> handler)
          Subscribe the given handler to commands of type commandType.
<C> boolean
unsubscribe(String commandName, CommandHandler<? super C> handler)
          Unsubscribe the given handler to commands of type commandType.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

DisruptorCommandBus

public DisruptorCommandBus(EventStore eventStore,
                           EventBus eventBus)
Initialize the DisruptorCommandBus with given resources, using default configuration settings. Uses a Blocking WaitStrategy on a RingBuffer of size 4096. The (2) Threads required for command execution are created immediately. Additional threads are used to invoke response callbacks and to initialize a recovery process in the case of errors.

Parameters:
eventStore - The EventStore where generated events must be stored
eventBus - The EventBus where generated events must be published

DisruptorCommandBus

public DisruptorCommandBus(EventStore eventStore,
                           EventBus eventBus,
                           DisruptorConfiguration configuration)
Initialize the DisruptorCommandBus with given resources and settings. The Threads required for command execution are immediately requested from the Configuration's Executor, if any. Otherwise, they are created.

Parameters:
eventStore - The EventStore where generated events must be stored
eventBus - The EventBus where generated events must be published
configuration - The configuration for the command bus
Method Detail

dispatch

public void dispatch(CommandMessage<?> command)
Description copied from interface: CommandBus
Dispatch the given command to the CommandHandler subscribed to that type of command. No feedback is given about the status of the dispatching process. Implementations may return immediately after asserting a valid handler is registered for the given command.

Specified by:
dispatch in interface CommandBus
Parameters:
command - The Command to dispatch
See Also:
GenericCommandMessage.asCommandMessage(Object)

dispatch

public <R> void dispatch(CommandMessage<?> command,
                         CommandCallback<R> callback)
Description copied from interface: CommandBus
Dispatch the given command to the CommandHandler subscribed to that type of command. When the command is processed, on of the callback methods is called, depending on the result of the processing.

When the method returns, the only guarantee provided by the CommandBus implementation, is that the command has been successfully received. Implementations are highly recommended to perform basic validation of the command before returning from this method call.

Implementations must start a UnitOfWork when before dispatching the command, and either commit or rollback after a successful or failed execution, respectively.

Specified by:
dispatch in interface CommandBus
Type Parameters:
R - The type of the expected result
Parameters:
command - The Command to dispatch
callback - The callback to invoke when command processing is complete
See Also:
GenericCommandMessage.asCommandMessage(Object)

doDispatch

public <R> void doDispatch(CommandMessage command,
                           CommandCallback<R> callback)
Forces a dispatch of a command. This method should be used with caution. It allows commands to be retried during the cooling down period of the disruptor.

Type Parameters:
R - The expected return type of the command
Parameters:
command - The command to dispatch
callback - The callback to notify when command handling is completed

createRepository

public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory)
Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory.

The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).

Type Parameters:
T - The type of aggregate to create the repository for
Parameters:
aggregateFactory - The factory creating uninitialized instances of the Aggregate
Returns:
the repository that provides access to stored aggregates

createRepository

public <T extends EventSourcedAggregateRoot> Repository<T> createRepository(AggregateFactory<T> aggregateFactory,
                                                                            EventStreamDecorator decorator)
Creates a repository instance for an Event Sourced aggregate that is created by the given aggregateFactory. The given decorator is used to decorate event streams.

The repository returned must be used by Command Handlers subscribed to this Command Bus for loading aggregate instances. Using any other repository instance may result in undefined outcome (a.k.a. concurrency problems).

Note that a second invocation of this method with an aggregate factory for the same aggregate type may return the same instance as the first invocation, even if the given decorator is different.

Type Parameters:
T - The type of aggregate to create the repository for
Parameters:
aggregateFactory - The factory creating uninitialized instances of the Aggregate
decorator - The decorator to decorate events streams with
Returns:
the repository that provides access to stored aggregates

subscribe

public <C> void subscribe(String commandName,
                          CommandHandler<? super C> handler)
Description copied from interface: CommandBus
Subscribe the given handler to commands of type commandType.

If a subscription already exists for the given type, the behavior is undefined. Implementations may throw an Exception to refuse duplicate subscription or alternatively decide whether the existing or new handler gets the subscription.

Specified by:
subscribe in interface CommandBus
Type Parameters:
C - The Type of command
Parameters:
commandName - The name of the command to subscribe the handler to
handler - The handler instance that handles the given type of command

unsubscribe

public <C> boolean unsubscribe(String commandName,
                               CommandHandler<? super C> handler)
Description copied from interface: CommandBus
Unsubscribe the given handler to commands of type commandType. If the handler is not currently assigned to that type of command, no action is taken.

Specified by:
unsubscribe in interface CommandBus
Type Parameters:
C - The Type of command
Parameters:
commandName - The name of the command the handler is subscribed to
handler - The handler instance to unsubscribe from the CommandBus
Returns:
true of this handler is successfully unsubscribed, false of the given handler was not the current handler for given commandType.

stop

public void stop()
Shuts down the command bus. It no longer accepts new commands, and finishes processing commands that have already been published. This method will not shut down any executor that has been provided as part of the Configuration.



Copyright © 2010-2016. All Rights Reserved.