org.axonframework.saga.annotation
Class AsyncAnnotatedSagaManager

java.lang.Object
  extended by org.axonframework.saga.AbstractReplayAwareSagaManager
      extended by org.axonframework.saga.annotation.AsyncAnnotatedSagaManager
All Implemented Interfaces:
Subscribable, EventListener, EventListenerProxy, EventProcessingMonitorSupport, ReplayAware, SagaManager

public class AsyncAnnotatedSagaManager
extends AbstractReplayAwareSagaManager
implements EventProcessingMonitorSupport, Subscribable

A SagaManager implementation that processes Sagas asynchronously. Incoming events are placed on a queue and processed by a given number of processors. Modified saga state is persisted in batches to the repository, to minimize communication overhead with back-ends.

This SagaManager implementation guarantees a "happens before" type processing for each Saga. That means that the behavior of asynchronously processed events is exactly identical as the behavior if the events were processed completely sequentially.

Since:
2.0
Author:
Allard Buijze

Constructor Summary
AsyncAnnotatedSagaManager(Class<? extends AbstractAnnotatedSaga>... sagaTypes)
          Initializes an Asynchronous Saga Manager using default values for the given sagaTypes.
AsyncAnnotatedSagaManager(EventBus eventBus, Class<? extends AbstractAnnotatedSaga>... sagaTypes)
          Deprecated. use AsyncAnnotatedSagaManager(Class[]) and register with the event bus using EventBus.subscribe(org.axonframework.eventhandling.EventListener)
AsyncAnnotatedSagaManager(ParameterResolverFactory parameterResolverFactory, Class<? extends AbstractAnnotatedSaga>... sagaTypes)
          Initializes an Asynchronous Saga Manager using default values for the given sagaTypes.
 
Method Summary
 Class<?> getTargetType()
          Returns the instance type that this proxy delegates all event handling to.
 void handle(EventMessage event)
          Handles the event by passing it to all Saga instances that have an Association Value found in the given event.
 void setBufferSize(int bufferSize)
          Sets the size of the processing buffer.
 void setCorrelationDataProvider(CorrelationDataProvider<? super EventMessage> correlationDataProvider)
          Sets the CorrelationDataProvider which provides the Meta Data to assign to messages generated by the Sagas that are managed by this SagaManager instance.
 void setCorrelationDataProviders(List<? extends CorrelationDataProvider<? super EventMessage>> correlationDataProviders)
          Sets the CorrelationDataProviders which provide the Meta Data to assign to messages generated by the Sagas that are managed by this SagaManager instance.
 void setErrorHandler(ErrorHandler errorHandler)
          Sets the ErrorHandler instance, which defines the behavior in case an error occurs while loading or invoking Sagas for an incoming Event.
 void setExecutor(Executor executor)
          Sets the executor that provides the threads for the processors.
 void setProcessorCount(int processorCount)
          Sets the number of processors (threads) to process events with.
 void setSagaFactory(SagaFactory sagaFactory)
          Sets the SagaFactory responsible for creating new Saga instances when required.
 void setSagaRepository(SagaRepository sagaRepository)
          Sets the saga repository to store and load Sagas from.
 void setStartTimeout(long startTimeout)
          Sets the amount of time (in milliseconds) the AsyncSagaManager will wait for the async processors to be assigned a thread from the executor.
 void setTransactionManager(TransactionManager transactionManager)
          Sets the TransactionManager used to manage any transactions required by the underlying storage mechanism.
 void setWaitStrategy(com.lmax.disruptor.WaitStrategy waitStrategy)
          Sets the WaitStrategy to use when event processors need to wait for incoming events.
 void start()
          Starts the Saga Manager by starting the processor threads and subscribes it with the eventBus.
 void stop()
          Unsubscribes this Saga Manager from the event bus and stops accepting new events.
 void subscribe()
          Subscribe this instance with its configured component.
 void subscribeEventProcessingMonitor(EventProcessingMonitor monitor)
          Subscribes the given monitor.
 void unsubscribe()
          Unsubscribe this instance from its subscribed component.
 void unsubscribeEventProcessingMonitor(EventProcessingMonitor monitor)
          Unsubscribed the given monitor.
 
Methods inherited from class org.axonframework.saga.AbstractReplayAwareSagaManager
afterReplay, beforeReplay, onReplayFailed, setReplayable
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

AsyncAnnotatedSagaManager

@Deprecated
public AsyncAnnotatedSagaManager(EventBus eventBus,
                                            Class<? extends AbstractAnnotatedSaga>... sagaTypes)
Deprecated. use AsyncAnnotatedSagaManager(Class[]) and register with the event bus using EventBus.subscribe(org.axonframework.eventhandling.EventListener)

Initializes an Asynchronous Saga Manager using default values for the given sagaTypes to listen to events on the given eventBus.

After initialization, the SagaManager must be explicitly started using the start() method.

Parameters:
eventBus - The Event Bus from which the Saga Manager will process events
sagaTypes - The types of Saga this saga manager will process incoming events for

AsyncAnnotatedSagaManager

public AsyncAnnotatedSagaManager(Class<? extends AbstractAnnotatedSaga>... sagaTypes)
Initializes an Asynchronous Saga Manager using default values for the given sagaTypes.

After initialization, the SagaManager must be explicitly started using the start() method.

Parameters:
sagaTypes - The types of Saga this saga manager will process incoming events for

AsyncAnnotatedSagaManager

public AsyncAnnotatedSagaManager(ParameterResolverFactory parameterResolverFactory,
                                 Class<? extends AbstractAnnotatedSaga>... sagaTypes)
Initializes an Asynchronous Saga Manager using default values for the given sagaTypes.

After initialization, the SagaManager must be explicitly started using the start() method.

Parameters:
parameterResolverFactory - The parameter resolver factory to resolve parameters of annotated handlers
sagaTypes - The types of Saga this saga manager will process incoming events for
Method Detail

start

public void start()
Starts the Saga Manager by starting the processor threads and subscribes it with the eventBus. If the saga manager is already started, it is only re-subscribed to the event bus.


stop

public void stop()
Unsubscribes this Saga Manager from the event bus and stops accepting new events. The method is blocked until all scheduled events have been processed. Note that any manually provided Executors using (setExecutor(java.util.concurrent.Executor) are not shut down.

If the Saga Manager was already stopped, nothing happens.


unsubscribe

public void unsubscribe()
Description copied from interface: Subscribable
Unsubscribe this instance from its subscribed component.

Specified by:
unsubscribe in interface Subscribable

subscribe

public void subscribe()
Description copied from interface: Subscribable
Subscribe this instance with its configured component.

Specified by:
subscribe in interface Subscribable

handle

public void handle(EventMessage event)
Description copied from interface: SagaManager
Handles the event by passing it to all Saga instances that have an Association Value found in the given event.

Specified by:
handle in interface EventListener
Specified by:
handle in interface SagaManager
Parameters:
event - the event to handle

getTargetType

public Class<?> getTargetType()
Description copied from interface: EventListenerProxy
Returns the instance type that this proxy delegates all event handling to.

Specified by:
getTargetType in interface EventListenerProxy
Returns:
the instance type that this proxy delegates all event handling to

subscribeEventProcessingMonitor

public void subscribeEventProcessingMonitor(EventProcessingMonitor monitor)
Description copied from interface: EventProcessingMonitorSupport
Subscribes the given monitor. If the monitor is already subscribed, nothing happens.

Specified by:
subscribeEventProcessingMonitor in interface EventProcessingMonitorSupport
Parameters:
monitor - The monitor to subscribe

unsubscribeEventProcessingMonitor

public void unsubscribeEventProcessingMonitor(EventProcessingMonitor monitor)
Description copied from interface: EventProcessingMonitorSupport
Unsubscribed the given monitor. If the monitor was not subscribed, or was already unsubscribed, nothing happens.

Specified by:
unsubscribeEventProcessingMonitor in interface EventProcessingMonitorSupport
Parameters:
monitor - The monitor to unsubscribe

setExecutor

public void setExecutor(Executor executor)
Sets the executor that provides the threads for the processors. Note that you must ensure that this executor is capable of delivering all of the required threads at once. If that is not the case, the Saga Manager might hang while waiting for the executor to provide them. Must be set before the SagaManager is started.

By default, a thread is created for each processor.

Parameters:
executor - the executor that provides the threads for the processors
See Also:
setProcessorCount(int)

setSagaRepository

public void setSagaRepository(SagaRepository sagaRepository)
Sets the saga repository to store and load Sagas from. Must be set before the SagaManager is started.

Defaults to an in-memory repository.

Parameters:
sagaRepository - the saga repository to store and load Sagas from

setSagaFactory

public void setSagaFactory(SagaFactory sagaFactory)
Sets the SagaFactory responsible for creating new Saga instances when required. Must be set before the SagaManager is started.

Defaults to a GenericSagaFactory instance.

Parameters:
sagaFactory - the SagaFactory responsible for creating new Saga instances

setErrorHandler

public void setErrorHandler(ErrorHandler errorHandler)
Sets the ErrorHandler instance, which defines the behavior in case an error occurs while loading or invoking Sagas for an incoming Event.

Parameters:
errorHandler - the error handler to notify when an error occurs

setTransactionManager

public void setTransactionManager(TransactionManager transactionManager)
Sets the TransactionManager used to manage any transactions required by the underlying storage mechanism. Note that batch sizes set by this transaction manager are ignored. Must be set before the SagaManager is started.

By default, no transactions are managed.

Parameters:
transactionManager - the TransactionManager used to manage any transactions required by the underlying storage mechanism.

setProcessorCount

public void setProcessorCount(int processorCount)
Sets the number of processors (threads) to process events with. Ensure that the given executor is capable of processing this amount of concurrent tasks. Must be set before the SagaManager is started.

Defaults to 1.

Parameters:
processorCount - the number of processors (threads) to process events with

setStartTimeout

public void setStartTimeout(long startTimeout)
Sets the amount of time (in milliseconds) the AsyncSagaManager will wait for the async processors to be assigned a thread from the executor. This is used to ensure that the executor provides a thread for the processors, instead of queueing them. This typically occurs when using a thread pool with a core pool size smaller than the processorCount.

Must be set before calling start(). Defaults to 5000 (5 seconds).

Parameters:
startTimeout - the number of millis to wait for the processor to have been assigned a thread. Defaults to 5000 (5 seconds).

setBufferSize

public void setBufferSize(int bufferSize)
Sets the size of the processing buffer. This is equal to the amount of events that may awaiting for processing before the input is blocked. Must be set before the SagaManager is started.

Note that this value must be a power of 2.

Defaults to 512.

Parameters:
bufferSize - The size of the processing buffer. Must be a power of 2.

setWaitStrategy

public void setWaitStrategy(com.lmax.disruptor.WaitStrategy waitStrategy)
Sets the WaitStrategy to use when event processors need to wait for incoming events.

Defaults to a BlockingWaitStrategy.

Parameters:
waitStrategy - the WaitStrategy to use when event processors need to wait for incoming events

setCorrelationDataProvider

public void setCorrelationDataProvider(CorrelationDataProvider<? super EventMessage> correlationDataProvider)
Sets the CorrelationDataProvider which provides the Meta Data to assign to messages generated by the Sagas that are managed by this SagaManager instance.

Parameters:
correlationDataProvider - the instance that provides the information to attach as correlation data

setCorrelationDataProviders

public void setCorrelationDataProviders(List<? extends CorrelationDataProvider<? super EventMessage>> correlationDataProviders)
Sets the CorrelationDataProviders which provide the Meta Data to assign to messages generated by the Sagas that are managed by this SagaManager instance.

Parameters:
correlationDataProviders - a list of instances that provides the information to attach as correlation data


Copyright © 2010-2016. All Rights Reserved.