org.axonframework.eventhandling.amqp.spring
Class SpringAMQPTerminal

java.lang.Object
  extended by org.axonframework.eventhandling.amqp.spring.SpringAMQPTerminal
All Implemented Interfaces:
EventBusTerminal, org.springframework.beans.factory.Aware, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware

public class SpringAMQPTerminal
extends Object
implements EventBusTerminal, org.springframework.beans.factory.InitializingBean, org.springframework.context.ApplicationContextAware

EventBusTerminal implementation that uses an AMQP 0.9 compatible Message Broker to dispatch event messages. All outgoing messages are sent to a configured Exchange, which defaults to .

This terminal does not dispatch Events internally, as it relies on each cluster to listen to it's own AMQP Queue.

Since:
2.0
Author:
Allard Buijze

Constructor Summary
SpringAMQPTerminal()
           
 
Method Summary
 void afterPropertiesSet()
           
protected  void doSendMessage(com.rabbitmq.client.Channel channel, AMQPMessage amqpMessage)
          Does the actual publishing of the given body on the given channel.
 void onClusterCreated(Cluster cluster)
          Invoked when an Event Listener has been assigned to a cluster that was not yet known to the Event Bus.
 void publish(EventMessage... events)
          Publishes the given events to all clusters on the Event Bus.
 void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
           
 void setConnectionFactory(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory)
          Sets the ConnectionFactory providing the Connections and Channels to send messages on.
 void setDurable(boolean durable)
          Whether or not messages should be marked as "durable" when sending them out.
 void setExchange(org.springframework.amqp.core.Exchange exchange)
          Sets the name of the exchange to dispatch published messages to.
 void setExchangeName(String exchangeName)
          Sets the name of the exchange to dispatch published messages to.
 void setListenerContainerLifecycleManager(ListenerContainerLifecycleManager listenerContainerLifecycleManager)
          Sets the ListenerContainerLifecycleManager that creates and manages the lifecycle of Listener Containers for the clusters that are connected to this terminal.
 void setMessageConverter(AMQPMessageConverter messageConverter)
          Sets the Message Converter that creates AMQP Messages from Event Messages and vice versa.
 void setPublisherAckTimeout(long publisherAckTimeout)
          Sets the maximum amount of time (in milliseconds) the publisher may wait for the acknowledgement of published messages.
 void setRoutingKeyResolver(RoutingKeyResolver routingKeyResolver)
          Sets the RoutingKeyResolver that provides the Routing Key for each message to dispatch.
 void setSerializer(Serializer serializer)
          Sets the serializer to serialize messages with when sending them to the Exchange.
 void setTransactional(boolean transactional)
          Whether this Terminal should dispatch its Events in a transaction or not.
 void setWaitForPublisherAck(boolean waitForPublisherAck)
          Enables or diables the RabbitMQ specific publisher acknowledgements (confirms).
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

SpringAMQPTerminal

public SpringAMQPTerminal()
Method Detail

publish

public void publish(EventMessage... events)
Description copied from interface: EventBusTerminal
Publishes the given events to all clusters on the Event Bus. The terminal is responsible for the delivery process, albeit local or remote.

Specified by:
publish in interface EventBusTerminal
Parameters:
events - the collections of events to publish

doSendMessage

protected void doSendMessage(com.rabbitmq.client.Channel channel,
                             AMQPMessage amqpMessage)
                      throws IOException
Does the actual publishing of the given body on the given channel. This method can be overridden to change the properties used to send a message.

Parameters:
channel - The channel to dispatch the message on
amqpMessage - The AMQPMessage describing the characteristics of the message to publish
Throws:
IOException - when an error occurs while writing the message

onClusterCreated

public void onClusterCreated(Cluster cluster)
Description copied from interface: EventBusTerminal
Invoked when an Event Listener has been assigned to a cluster that was not yet known to the Event Bus. This method is invoked only once for each cluster that was assigned an Event Listener. Subsequent Event Listeners are added to the cluster. Cluster remain "live" when all event listeners have been removed from them.

Specified by:
onClusterCreated in interface EventBusTerminal
Parameters:
cluster - the newly created cluster

afterPropertiesSet

public void afterPropertiesSet()
                        throws Exception
Specified by:
afterPropertiesSet in interface org.springframework.beans.factory.InitializingBean
Throws:
Exception

setTransactional

public void setTransactional(boolean transactional)
Whether this Terminal should dispatch its Events in a transaction or not. Defaults to false.

If a delegate Terminal is configured, the transaction will be committed after the delegate has dispatched the events.

Transactional behavior cannot be enabled if setWaitForPublisherAck(boolean) has been set to true.

Parameters:
transactional - whether dispatching should be transactional or not

setWaitForPublisherAck

public void setWaitForPublisherAck(boolean waitForPublisherAck)
Enables or diables the RabbitMQ specific publisher acknowledgements (confirms). When confirms are enabled, the terminal will wait until the server has acknowledged the reception (or fsync to disk on persistent messages) of all published messages.

Server ACKS cannot be enabled when transactions are enabled.

See RabbitMQ Documentation for more information about publisher acknowledgements.

Parameters:
waitForPublisherAck - whether or not to enab;e server acknowledgements (confirms)

setPublisherAckTimeout

public void setPublisherAckTimeout(long publisherAckTimeout)
Sets the maximum amount of time (in milliseconds) the publisher may wait for the acknowledgement of published messages. If not all messages have been acknowledged withing this time, the publication will throw an EventPublicationFailedException.

This setting is only used when setWaitForPublisherAck(boolean) is set to true.

Parameters:
publisherAckTimeout - The number of milliseconds to wait for confirms, or 0 to wait indefinitely.

setConnectionFactory

public void setConnectionFactory(org.springframework.amqp.rabbit.connection.ConnectionFactory connectionFactory)
Sets the ConnectionFactory providing the Connections and Channels to send messages on. The SpringAMQPTerminal does not cache or reuse connections. Providing a ConnectionFactory instance that caches connections will prevent new connections to be opened for each invocation to publish(org.axonframework.domain.EventMessage[])

Defaults to an autowired Connection Factory.

Parameters:
connectionFactory - The connection factory to set

setMessageConverter

public void setMessageConverter(AMQPMessageConverter messageConverter)
Sets the Message Converter that creates AMQP Messages from Event Messages and vice versa. Setting this property will ignore the "durable", "serializer" and "routingKeyResolver" properties, which just act as short hands to create a DefaultAMQPMessageConverter instance.

Defaults to a DefaultAMQPMessageConverter.

Parameters:
messageConverter - The message converter to convert AMQP Messages to Event Messages and vice versa.

setDurable

public void setDurable(boolean durable)
Whether or not messages should be marked as "durable" when sending them out. Durable messages suffer from a performance penalty, but will survive a reboot of the Message broker that stores them.

By default, messages are durable.

Note that this setting is ignored if a MessageConverter is provided. In that case, the message converter must add the properties to reflect the required durability setting.

Parameters:
durable - whether or not messages should be durable

setSerializer

public void setSerializer(Serializer serializer)
Sets the serializer to serialize messages with when sending them to the Exchange.

Defaults to an autowired serializer, which requires exactly 1 eligible serializer to be present in the application context.

This setting is ignored if a MessageConverter is configured.

Parameters:
serializer - the serializer to serialize message with

setRoutingKeyResolver

public void setRoutingKeyResolver(RoutingKeyResolver routingKeyResolver)
Sets the RoutingKeyResolver that provides the Routing Key for each message to dispatch. Defaults to a PackageRoutingKeyResolver, which uses the package name of the message's payload as a Routing Key.

This setting is ignored if a MessageConverter is configured.

Parameters:
routingKeyResolver - the RoutingKeyResolver to use

setExchangeName

public void setExchangeName(String exchangeName)
Sets the name of the exchange to dispatch published messages to. Defaults to "".

Parameters:
exchangeName - the name of the exchange to dispatch messages to

setExchange

public void setExchange(org.springframework.amqp.core.Exchange exchange)
Sets the name of the exchange to dispatch published messages to. Defaults to the exchange named "".

Parameters:
exchange - the exchange to dispatch messages to

setListenerContainerLifecycleManager

public void setListenerContainerLifecycleManager(ListenerContainerLifecycleManager listenerContainerLifecycleManager)
Sets the ListenerContainerLifecycleManager that creates and manages the lifecycle of Listener Containers for the clusters that are connected to this terminal.

Defaults to an autowired ListenerContainerLifecycleManager

Parameters:
listenerContainerLifecycleManager - the listenerContainerLifecycleManager to set

setApplicationContext

public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
                           throws org.springframework.beans.BeansException
Specified by:
setApplicationContext in interface org.springframework.context.ApplicationContextAware
Throws:
org.springframework.beans.BeansException


Copyright © 2010-2016. All Rights Reserved.