org.axonframework.commandhandling.disruptor
Class DisruptorConfiguration

java.lang.Object
  extended by org.axonframework.commandhandling.disruptor.DisruptorConfiguration

public class DisruptorConfiguration
extends Object

Configuration object for the DisruptorCommandBus. The DisruptorConfiguration provides access to the options to tweak performance settings. Instances are not thread-safe and should not be altered after they have been used to initialize a DisruptorCommandBus.

Since:
2.0
Author:
Allard Buijze

Field Summary
static int DEFAULT_BUFFER_SIZE
          The default size of the buffer in this configuration
 
Constructor Summary
DisruptorConfiguration()
          Initializes a configuration instance with default settings: ring-buffer size: 4096, blocking wait strategy and multi-threaded producer type.
 
Method Summary
 int getBufferSize()
          Returns the buffer size to use.
 Cache getCache()
          Returns the cache used to store Aggregates loaded by the DisruptorCommandBus.
 CommandTargetResolver getCommandTargetResolver()
          Returns the CommandTargetResolver that is used to find out which Aggregate is to be invoked for a given Command.
 long getCoolingDownPeriod()
          Returns the cooling down period for the shutdown of the DisruptorCommandBus, in milliseconds.
 List<CommandDispatchInterceptor> getDispatchInterceptors()
          Returns the dispatch interceptors for the DisruptorCommandBus.
 Executor getExecutor()
          Returns the Executor providing the processing resources (Threads) for the DisruptorCommandBus.
 List<CommandHandlerInterceptor> getInvokerInterceptors()
          Returns the interceptors for the DisruptorCommandBus.
 int getInvokerThreadCount()
          Returns the number of threads to use for Command Handler invocation.
 com.lmax.disruptor.dsl.ProducerType getProducerType()
          Returns the producer type to use.
 List<CommandHandlerInterceptor> getPublisherInterceptors()
          Returns the interceptors for the DisruptorCommandBus.
 int getPublisherThreadCount()
          Returns the number of threads to use for storing and publication of generated Events.
 boolean getRescheduleCommandsOnCorruptState()
          Indicates whether commands that failed due to potentially corrupt Aggregate state should be automatically rescheduled for processing.
 RollbackConfiguration getRollbackConfiguration()
          Returns the RollbackConfiguration indicating for which Exceptions the DisruptorCommandBus should perform a rollback, and which exceptions should result in a Commit.
 Class<?> getSerializedRepresentation()
          Returns the type of data the serialized object should be represented in.
 Serializer getSerializer()
          Returns the serializer to perform pre-serialization with, or null if no pre-serialization should be done.
 int getSerializerThreadCount()
          Returns the configured number of threads that should perform the pre-serialization step.
 TransactionManager getTransactionManager()
          Returns the transaction manager to use to manage a transaction around the storage and publication of events.
 com.lmax.disruptor.WaitStrategy getWaitStrategy()
          Returns the WaitStrategy currently configured.
 boolean isPreSerializationConfigured()
          Indicates whether pre-serialization is configured.
 DisruptorConfiguration setBufferSize(int newBufferSize)
          Sets the buffer size to use.
 DisruptorConfiguration setCache(Cache cache)
          Sets the cache in which loaded aggregates will be stored.
 DisruptorConfiguration setCommandTargetResolver(CommandTargetResolver newCommandTargetResolver)
          Sets the CommandTargetResolver that must be used to indicate which Aggregate instance will be invoked by an incoming command.
 DisruptorConfiguration setCoolingDownPeriod(long coolingDownPeriod)
          Sets the cooling down period in milliseconds.
 DisruptorConfiguration setDispatchInterceptors(List<CommandDispatchInterceptor> dispatchInterceptors)
          Configures the CommandDispatchInterceptor to use with the DisruptorCommandBus when commands are dispatched.
 DisruptorConfiguration setExecutor(Executor executor)
          Sets the Executor that provides the processing resources (Threads) for the components of the DisruptorCommandBus.
 DisruptorConfiguration setInvokerInterceptors(List<CommandHandlerInterceptor> invokerInterceptors)
          Configures the CommandHandlerInterceptors to use with the DisruptorCommandBus during in the invocation thread.
 DisruptorConfiguration setInvokerThreadCount(int count)
          Sets the number of Threads that should be used to invoke the Command Handlers.
 DisruptorConfiguration setProducerType(com.lmax.disruptor.dsl.ProducerType producerType)
          Sets the producer type to use.
 DisruptorConfiguration setPublisherInterceptors(List<CommandHandlerInterceptor> publisherInterceptors)
          Configures the CommandHandlerInterceptors to use with the DisruptorCommandBus during the publication of changes.
 DisruptorConfiguration setPublisherThreadCount(int count)
          Sets the number of Threads that should be used to store and publish the generated Events.
 DisruptorConfiguration setRescheduleCommandsOnCorruptState(boolean rescheduleCommandsOnCorruptState)
          Indicates whether commands that failed because they were executed against potentially corrupted aggregate state should be automatically rescheduled.
 DisruptorConfiguration setRollbackConfiguration(RollbackConfiguration rollbackConfiguration)
          Sets the rollback configuration for the DisruptorCommandBus to use.
 DisruptorConfiguration setSerializedRepresentation(Class<?> newSerializedRepresentation)
          Sets the type of data the serialized object should be represented in.
 DisruptorConfiguration setSerializer(Serializer newSerializer)
          Returns the serializer to perform pre-serialization with, or null if no pre-serialization should be done.
 DisruptorConfiguration setSerializerThreadCount(int newSerializerThreadCount)
          Sets the number of threads that should perform the pre-serialization step.
 DisruptorConfiguration setTransactionManager(TransactionManager newTransactionManager)
          Sets the transaction manager to use to manage a transaction around the storage and publication of events.
 DisruptorConfiguration setWaitStrategy(com.lmax.disruptor.WaitStrategy waitStrategy)
          Sets the WaitStrategy, which used to make dependent threads wait for tasks to be completed.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Field Detail

DEFAULT_BUFFER_SIZE

public static final int DEFAULT_BUFFER_SIZE
The default size of the buffer in this configuration

See Also:
Constant Field Values
Constructor Detail

DisruptorConfiguration

public DisruptorConfiguration()
Initializes a configuration instance with default settings: ring-buffer size: 4096, blocking wait strategy and multi-threaded producer type.

Method Detail

getWaitStrategy

public com.lmax.disruptor.WaitStrategy getWaitStrategy()
Returns the WaitStrategy currently configured.

Returns:
the WaitStrategy currently configured

setWaitStrategy

public DisruptorConfiguration setWaitStrategy(com.lmax.disruptor.WaitStrategy waitStrategy)
Sets the WaitStrategy, which used to make dependent threads wait for tasks to be completed. The choice of strategy mainly depends on the number of processors available and the number of tasks other than the DisruptorCommandBus being processed.

The BusySpinWaitStrategy provides the best throughput at the lowest latency, but also put a big claim on available CPU resources. The SleepingWaitStrategy yields lower performance, but leaves resources available for other processes to use.

Defaults to the BlockingWaitStrategy.

Parameters:
waitStrategy - The WaitStrategy to use
Returns:
this for method chaining
See Also:
SleepingWaitStrategy, BlockingWaitStrategy, BusySpinWaitStrategy, YieldingWaitStrategy

getExecutor

public Executor getExecutor()
Returns the Executor providing the processing resources (Threads) for the DisruptorCommandBus.

Returns:
the Executor providing the processing resources

setExecutor

public DisruptorConfiguration setExecutor(Executor executor)
Sets the Executor that provides the processing resources (Threads) for the components of the DisruptorCommandBus. The provided executor must be capable of providing the required number of threads. Three threads are required immediately at startup and will not be returned until the CommandBus is stopped. Additional threads are used to invoke callbacks and start a recovery process in case aggregate state has been corrupted. Failure to do this results in the disruptor hanging at startup, waiting for resources to become available.

Defaults to null, causing the DisruptorCommandBus to create the necessary threads itself. In that case, threads are created in the DisruptorCommandBus ThreadGroup.

Parameters:
executor - the Executor that provides the processing resources
Returns:
this for method chaining

getInvokerInterceptors

public List<CommandHandlerInterceptor> getInvokerInterceptors()
Returns the interceptors for the DisruptorCommandBus.

Returns:
the interceptors for the DisruptorCommandBus

setInvokerInterceptors

public DisruptorConfiguration setInvokerInterceptors(List<CommandHandlerInterceptor> invokerInterceptors)
Configures the CommandHandlerInterceptors to use with the DisruptorCommandBus during in the invocation thread. The interceptors are invoked by the thread that also executes the command handler.

Note that this is *not* the thread that stores and publishes the generated events. See setPublisherInterceptors(java.util.List).

Parameters:
invokerInterceptors - The interceptors to invoke when handling an incoming command
Returns:
this for method chaining

getPublisherInterceptors

public List<CommandHandlerInterceptor> getPublisherInterceptors()
Returns the interceptors for the DisruptorCommandBus.

Returns:
the interceptors for the DisruptorCommandBus

setPublisherInterceptors

public DisruptorConfiguration setPublisherInterceptors(List<CommandHandlerInterceptor> publisherInterceptors)
Configures the CommandHandlerInterceptors to use with the DisruptorCommandBus during the publication of changes. The interceptors are invoked by the thread that also stores and publishes the events.

Parameters:
publisherInterceptors - The interceptors to invoke when handling an incoming command
Returns:
this for method chaining

getDispatchInterceptors

public List<CommandDispatchInterceptor> getDispatchInterceptors()
Returns the dispatch interceptors for the DisruptorCommandBus.

Returns:
the dispatch interceptors for the DisruptorCommandBus

setDispatchInterceptors

public DisruptorConfiguration setDispatchInterceptors(List<CommandDispatchInterceptor> dispatchInterceptors)
Configures the CommandDispatchInterceptor to use with the DisruptorCommandBus when commands are dispatched. The interceptors are invoked by the thread that provides the commands to the command bus.

Parameters:
dispatchInterceptors - The dispatch interceptors to invoke when dispatching a command
Returns:
this for method chaining

getRollbackConfiguration

public RollbackConfiguration getRollbackConfiguration()
Returns the RollbackConfiguration indicating for which Exceptions the DisruptorCommandBus should perform a rollback, and which exceptions should result in a Commit.

Note that only exceptions resulting from Command processing are evaluated. Exceptions that occur while attempting to store or publish events will always result in a Rollback.

Returns:
the RollbackConfiguration indicating for the DisruptorCommandBus

setRollbackConfiguration

public DisruptorConfiguration setRollbackConfiguration(RollbackConfiguration rollbackConfiguration)
Sets the rollback configuration for the DisruptorCommandBus to use. Defaults to RollbackOnUncheckedExceptionConfiguration a configuration that commits on checked exceptions, and performs a rollback on runtime exceptions.

Parameters:
rollbackConfiguration - the RollbackConfiguration indicating for the DisruptorCommandBus
Returns:
this for method chaining

getRescheduleCommandsOnCorruptState

public boolean getRescheduleCommandsOnCorruptState()
Indicates whether commands that failed due to potentially corrupt Aggregate state should be automatically rescheduled for processing.

Returns:
true if commands are automatically rescheduled, otherwise false

setRescheduleCommandsOnCorruptState

public DisruptorConfiguration setRescheduleCommandsOnCorruptState(boolean rescheduleCommandsOnCorruptState)
Indicates whether commands that failed because they were executed against potentially corrupted aggregate state should be automatically rescheduled. Commands that caused the aggregate state to become corrupted are never automatically rescheduled, to prevent poison message syndrome.

Default to true.

Parameters:
rescheduleCommandsOnCorruptState - whether or not to automatically reschedule commands that failed due to potentially corrupted aggregate state.
Returns:
this for method chaining

getCoolingDownPeriod

public long getCoolingDownPeriod()
Returns the cooling down period for the shutdown of the DisruptorCommandBus, in milliseconds. This is the time in which new commands are no longer accepted, but the DisruptorCommandBus may reschedule Commands that may have been executed against a corrupted Aggregate. If no commands have been rescheduled during this period, the disruptor shuts down completely. Otherwise, it wait until no commands were scheduled for processing.

Returns:
the cooling down period for the shutdown of the DisruptorCommandBus, in milliseconds.

setCoolingDownPeriod

public DisruptorConfiguration setCoolingDownPeriod(long coolingDownPeriod)
Sets the cooling down period in milliseconds. This is the time in which new commands are no longer accepted, but the DisruptorCommandBus may reschedule Commands that may have been executed against a corrupted Aggregate. If no commands have been rescheduled during this period, the disruptor shuts down completely. Otherwise, it wait until no commands were scheduled for processing.

Defaults to 1000 (1 second).

Parameters:
coolingDownPeriod - the cooling down period for the shutdown of the DisruptorCommandBus, in milliseconds.
Returns:
this for method chaining

getCache

public Cache getCache()
Returns the cache used to store Aggregates loaded by the DisruptorCommandBus.

Returns:
the cache used to store Aggregates

setCache

public DisruptorConfiguration setCache(Cache cache)
Sets the cache in which loaded aggregates will be stored. Aggregates that are not active in the CommandBus' buffer will be loaded from this cache. If they are not in the cache, a new instance will be constructed using Events from the Event Store.

By default, no cache is used.

Parameters:
cache - The cache to store loaded aggregates in.
Returns:
this for method chaining

getCommandTargetResolver

public CommandTargetResolver getCommandTargetResolver()
Returns the CommandTargetResolver that is used to find out which Aggregate is to be invoked for a given Command.

Returns:
the CommandTargetResolver that is used to find out which Aggregate is to be invoked for a given Command

setCommandTargetResolver

public DisruptorConfiguration setCommandTargetResolver(CommandTargetResolver newCommandTargetResolver)
Sets the CommandTargetResolver that must be used to indicate which Aggregate instance will be invoked by an incoming command. The DisruptorCommandBus only uses this value if invokerThreadCount, serializerThreadCount or publisherThreadCount is greater than 1.

Defaults to an AnnotationCommandTargetResolver instance.

Parameters:
newCommandTargetResolver - The CommandTargetResolver to use to indicate which Aggregate instance is target of an incoming Command
Returns:
this for method chaining

getInvokerThreadCount

public int getInvokerThreadCount()
Returns the number of threads to use for Command Handler invocation.

Returns:
the number of threads to use for Command Handler invocation

setInvokerThreadCount

public DisruptorConfiguration setInvokerThreadCount(int count)
Sets the number of Threads that should be used to invoke the Command Handlers. Defaults to 1.

A good value for this setting mainly depends on the number of cores your machine has, as well as the amount of I/O that the process requires. A good range, if no I/O is involved is 1 .. ([processor count] / 2).

Parameters:
count - The number of Threads to use for Command Handler invocation
Returns:
this for method chaining

getPublisherThreadCount

public int getPublisherThreadCount()
Returns the number of threads to use for storing and publication of generated Events.

Returns:
the number of threads to use for storing and publication of generated Events

setPublisherThreadCount

public DisruptorConfiguration setPublisherThreadCount(int count)
Sets the number of Threads that should be used to store and publish the generated Events. Defaults to 1.

A good value for this setting mainly depends on the number of cores your machine has, as well as the amount of I/O that the process requires. If no I/O is involved, a good starting value is [processors / 2].

Parameters:
count - The number of Threads to use for publishing
Returns:
this for method chaining

getSerializerThreadCount

public int getSerializerThreadCount()
Returns the configured number of threads that should perform the pre-serialization step. This value is ignored unless a serializer is set using setSerializer(org.axonframework.serializer.Serializer).

Returns:
the number of threads to perform pre-serialization with

setSerializerThreadCount

public DisruptorConfiguration setSerializerThreadCount(int newSerializerThreadCount)
Sets the number of threads that should perform the pre-serialization step. This value is ignored unless a serializer is set using setSerializer(org.axonframework.serializer.Serializer).

Parameters:
newSerializerThreadCount - the number of threads to perform pre-serialization with
Returns:
this for method chaining

getSerializer

public Serializer getSerializer()
Returns the serializer to perform pre-serialization with, or null if no pre-serialization should be done.

Returns:
the serializer to perform pre-serialization with, or null if no pre-serialization should be done

setSerializer

public DisruptorConfiguration setSerializer(Serializer newSerializer)
Returns the serializer to perform pre-serialization with, or null if no pre-serialization should be done. Defaults to null.

Parameters:
newSerializer - the serializer to perform pre-serialization with, or null if no pre-serialization should be done
Returns:
this for method chaining

isPreSerializationConfigured

public boolean isPreSerializationConfigured()
Indicates whether pre-serialization is configured. Is true when a serializer and at least one thread is configured.

Returns:
whether pre-serialization is configured

getSerializedRepresentation

public Class<?> getSerializedRepresentation()
Returns the type of data the serialized object should be represented in. Defaults to a byte array.

Returns:
the type of data the serialized object should be represented in

setSerializedRepresentation

public DisruptorConfiguration setSerializedRepresentation(Class<?> newSerializedRepresentation)
Sets the type of data the serialized object should be represented in. Defaults to a byte array (byte[]).

Parameters:
newSerializedRepresentation - the type of data the serialized object should be represented in. May not be null.
Returns:
this for method chaining

getTransactionManager

public TransactionManager getTransactionManager()
Returns the transaction manager to use to manage a transaction around the storage and publication of events.

Returns:
the transaction manager to use to manage a transaction around the storage and publication of events, or null if none is configured.

setTransactionManager

public DisruptorConfiguration setTransactionManager(TransactionManager newTransactionManager)
Sets the transaction manager to use to manage a transaction around the storage and publication of events. The default (null) is to not have publication and storage of events wrapped in a transaction.

Parameters:
newTransactionManager - the transaction manager to use to manage a transaction around the storage and publication of events
Returns:
this for method chaining

getBufferSize

public int getBufferSize()
Returns the buffer size to use.

Returns:
the buffer size to use.

setBufferSize

public DisruptorConfiguration setBufferSize(int newBufferSize)
Sets the buffer size to use. The default is 4096.

Parameters:
newBufferSize - the buffer size to use
Returns:
this for method chaining

getProducerType

public com.lmax.disruptor.dsl.ProducerType getProducerType()
Returns the producer type to use.

Returns:
the producer type to use.

setProducerType

public DisruptorConfiguration setProducerType(com.lmax.disruptor.dsl.ProducerType producerType)
Sets the producer type to use. The default is to use a multi-threaded producer type.

Parameters:
producerType - the producer type to use
Returns:
this for method chaining


Copyright © 2010-2016. All Rights Reserved.