org.axonframework.eventhandling.replay
Class BackloggingIncomingMessageHandler

java.lang.Object
  extended by org.axonframework.eventhandling.replay.BackloggingIncomingMessageHandler
All Implemented Interfaces:
IncomingMessageHandler

public class BackloggingIncomingMessageHandler
extends Object
implements IncomingMessageHandler

IncomingMessageHandler implementation that maintains a backlog of all Events published to a cluster while it is in replay mode. When the replay finishes, events in the backlog are processed. Events that have been replayed will be removed from the backlog, to prevent duplicate publication of events.

When a replay fails, the backlog is cleared. This means backlogged items will not be forwarded to the cluster for handling.

Note that a single BackloggingIncomingMessageHandler should *not* be used for multiple Clusters. Each cluster must have its own instance of BackloggingIncomingMessageHandler.

Since:
2.0
Author:
Allard Buijze

Constructor Summary
BackloggingIncomingMessageHandler()
          Creates a new BackloggingIncomingMessageHandler.
BackloggingIncomingMessageHandler(org.joda.time.Duration backlogThresholdMargin)
          Creates a new BackloggingIncomingMessageHandler.
BackloggingIncomingMessageHandler(org.joda.time.Duration backlogThresholdMargin, Queue<EventMessage> backlog)
          Creates a new BackloggingIncomingMessageHandler.
 
Method Summary
 List<EventMessage> onIncomingMessages(Cluster destination, EventMessage... messages)
          Invoked while the ReplayingCluster is in replay mode and an Event is being dispatched to the Cluster.
 void onReplayFailed(Cluster destination, Throwable cause)
          Invoked when a replay has failed.
 void prepareForReplay(Cluster destination)
          Invoked just before replay mode is activated.
 void processBacklog(Cluster destination)
          Invoked when all events from the Event Store have been processed.
 List<EventMessage> releaseMessage(Cluster destination, DomainEventMessage message)
          Invoked when a message has been replayed from the event store.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

BackloggingIncomingMessageHandler

public BackloggingIncomingMessageHandler()
Creates a new BackloggingIncomingMessageHandler. Event Messages that have been generated more than 5 seconds before the start of the replay are not placed in the backlog, as they are assumed to be processed by the replay process. If the latency of the replayed cluster is expected to be more than 5 seconds, use the BackloggingIncomingMessageHandler(org.joda.time.Duration) constructor to provide a margin that better suits the latency.


BackloggingIncomingMessageHandler

public BackloggingIncomingMessageHandler(org.joda.time.Duration backlogThresholdMargin)
Creates a new BackloggingIncomingMessageHandler. The given backlogThresholdMargin indicates the age that an event may have to be backlogged. Older events will not be backlogged, as we may assume they will be part of the replay process.

This margin is also used to optimize the lookup of Events in the backlog. Events are only looked up in the portion of the queue that contains events with a timestamp near that of the event being looked for.

A good starting value for this is the maximum expected latency of incoming events.

Parameters:
backlogThresholdMargin - The margin of time to take into account when backlogging events.

BackloggingIncomingMessageHandler

public BackloggingIncomingMessageHandler(org.joda.time.Duration backlogThresholdMargin,
                                         Queue<EventMessage> backlog)
Creates a new BackloggingIncomingMessageHandler. The given backlogThresholdMargin indicates the age that an event may have to be backlogged. Older events will not be backlogged, as we may assume they will be part of the replay process. The given backlog queue is used to store backlogged events. Note that the backlog queue should support concurrent modification while the BackloggingIncomingMessageHandler is iterating over the queue.

This margin is also used to optimize the lookup of Events in the backlog. Events are only looked up in the portion of the queue that contains events with a timestamp near that of the event being looked for.

A good starting value for this is the maximum expected latency of incoming events.

Parameters:
backlogThresholdMargin - The margin of time to take into account when backlogging events.
backlog - The concurrent queue instance used to store backlogged events.
Method Detail

prepareForReplay

public void prepareForReplay(Cluster destination)
Description copied from interface: IncomingMessageHandler
Invoked just before replay mode is activated. Any messages passed to IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]) prior to this method invocation should be dispatched immediately to the destination cluster to prevent message loss.

This method is invoked in the thread that executes the replay process.

Specified by:
prepareForReplay in interface IncomingMessageHandler
Parameters:
destination - The cluster on which events are about te be replayed

onIncomingMessages

public List<EventMessage> onIncomingMessages(Cluster destination,
                                             EventMessage... messages)
Description copied from interface: IncomingMessageHandler
Invoked while the ReplayingCluster is in replay mode and an Event is being dispatched to the Cluster. If the timestamp of the given message is before the timestamp of any message reported via IncomingMessageHandler.releaseMessage(org.axonframework.eventhandling.Cluster, org.axonframework.domain.DomainEventMessage), consider discarding the incoming message.

This method returns the list of messages that must be considered as handled. May be null to indicate all given messages have been stored for processing later.

This method is invoked in the thread that attempts to publish the given messages to the given destination.

Specified by:
onIncomingMessages in interface IncomingMessageHandler
Parameters:
destination - The cluster to receive the message
messages - The messages to dispatch to the cluster
Returns:
a list of messages that may be considered as handled

releaseMessage

public List<EventMessage> releaseMessage(Cluster destination,
                                         DomainEventMessage message)
Description copied from interface: IncomingMessageHandler
Invoked when a message has been replayed from the event store. If such a message has been received with IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]), it should be discarded.

After this invocation, any invocation of IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]) with a message who's timestamp (minus a safety buffer to account for clock differences) is lower that this message's timestamp can be safely discarded. It is recommended that non-Domain EventMessages in the backlog are forwarded to the cluster provided, instead of discarded. They must then also be included in the returned list.

This method returns the list of EventMessages that must be considered processed, regardless of whether they have been forwarded to the original destination or not. These EventMessages have been registered in a call to IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]).

It is highly recommended to return the instance used in the IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]) invocation, over the given message, even if they refer to the save Event.

This method is invoked in the thread that executes the replay process

Specified by:
releaseMessage in interface IncomingMessageHandler
Parameters:
destination - The original destination of the message to be released
message - The message replayed from the event store
Returns:
The list of messages that have been released

onReplayFailed

public void onReplayFailed(Cluster destination,
                           Throwable cause)
Description copied from interface: IncomingMessageHandler
Invoked when a replay has failed. Typically, this means the state of the cluster's backing data source cannot be guaranteed, and the replay should be retried.

Specified by:
onReplayFailed in interface IncomingMessageHandler
Parameters:
destination - The destination cluster to dispatch backlogged messages to, if appropriate in this scenario
cause - The cause of the failure

processBacklog

public void processBacklog(Cluster destination)
Description copied from interface: IncomingMessageHandler
Invoked when all events from the Event Store have been processed. Any remaining backlog, as well as any messages received through IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]) should be dispatched to the given delegate. Transactions started by the replay process have been committed or rolled back prior to the invocation of this method.

Note that IncomingMessageHandler.onIncomingMessages(org.axonframework.eventhandling.Cluster, org.axonframework.domain.EventMessage[]) may be invoked during or after the invocation of this method. These messages must be dispatched by this handler to prevent message loss.

This method is invoked in the thread that executes the replay process

Specified by:
processBacklog in interface IncomingMessageHandler
Parameters:
destination - The destination cluster to dispatch backlogged messages to


Copyright © 2010-2016. All Rights Reserved.