Class QueueMessageStream<M extends Message>

java.lang.Object
org.axonframework.messaging.core.QueueMessageStream<M>
Type Parameters:
M - The type of Message managed by this stream.
All Implemented Interfaces:
MessageStream<M>

public class QueueMessageStream<M extends Message> extends Object implements MessageStream<M>
MessageStream implementation that uses a Queue to make elements available to a consumer.
Since:
5.0.0
Author:
Allard Buijze
  • Constructor Details

    • QueueMessageStream

      public QueueMessageStream()
      Constructs an instance with an unbounded queue. Offering entries will always be possible, as long as memory permits.
    • QueueMessageStream

      public QueueMessageStream(@Nonnull BlockingQueue<MessageStream.Entry<M>> queue)
      Construct an instance with given queue as the underlying queue. Offering and consuming entries will depend on the semantics of the implementation of the queue.

      Note that delivery and consumption of entries is done through BlockingQueue.offer(Object) and Queue.poll(), respectively. This means that a queue must be available to buffer elements. Implementations of a TransferQueue typically don't have this, and will therefore not work.

      Parameters:
      queue - The queue to use to store entries in transit from producer to consumer.
  • Method Details

    • offer

      public boolean offer(@Nonnull M message, @Nonnull Context context)
      Add the given message and accompanying context available for reading by a consumer. Any callback that has been registered will be notified of the availability of a new entry.

      If the underling buffer has insufficient space to store the offered element, or if the stream has been closed, the method returns false.

      Parameters:
      message - The message to add to the queue.
      context - The context to accompany the message.
      Returns:
      true if the message was successfully buffered. Otherwise false.
    • complete

      public void complete()
      Marks the queue as completed, indicating to any consumer that no more entries will become available.

      Note that there is no validation on offering items whether the stream is completed. It is the caller's responsibility to ensure no Messages are offered after completion.

    • completeExceptionally

      public void completeExceptionally(@Nonnull Throwable error)
      Marks the queue as completed exceptionally caused by given error, indicating to any consumer that no more messages will become available.

      Note that there is no validation on offering items whether the stream is completed. It is the caller's responsibility to ensure no Messages are offered after completion.

      Parameters:
      error - The cause of the exceptional completion
    • onConsumeCallback

      public void onConsumeCallback(@Nonnull Runnable callback)
      Registers given callback to be invoked when entries have been consumed from the underlying queue. Any previously registered callback will be replaced.

      The given callback is also notified when the consumer has requested to close() this stream.

      Parameters:
      callback - The callback to invoke when entries are consumed.
    • next

      public Optional<MessageStream.Entry<M>> next()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      next in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.
    • setCallback

      public void setCallback(@Nonnull Runnable callback)
      Description copied from interface: MessageStream
      Registers the callback to invoke when entries are available for reading or when the stream completes (either normally or with an error). An invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.

      Any previously registered callback is replaced with the given callback.

      The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error.

      Specified by:
      setCallback in interface MessageStream<M extends Message>
      Parameters:
      callback - The callback to invoke when entries are available for reading, or the stream completes.
    • error

      public Optional<Throwable> error()
      Description copied from interface: MessageStream
      Indicates whether any error has been reported in this stream. Implementations may choose to not return any error here until all entries that were available for reading before any error occurred have been consumed.
      Specified by:
      error in interface MessageStream<M extends Message>
      Returns:
      An optional containing the possible error this stream completed with.
    • isCompleted

      public boolean isCompleted()
      Description copied from interface: MessageStream
      Indicates whether this stream has been completed. A completed stream will never return entries from MessageStream.next(), and MessageStream.hasNextAvailable() will always return false. If the stream completed with an error, MessageStream.error() will report so.
      Specified by:
      isCompleted in interface MessageStream<M extends Message>
      Returns:
      true if the stream completed, otherwise false.
    • hasNextAvailable

      public boolean hasNextAvailable()
      Description copied from interface: MessageStream
      Indicates whether an entry is available for immediate reading. When entries are reported available, there is no guarantee that MessageStream.next() will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returns false.
      Specified by:
      hasNextAvailable in interface MessageStream<M extends Message>
      Returns:
      true when there are entries available for reading, false otherwise.
    • close

      public void close()
      Description copied from interface: MessageStream
      Closes this stream, freeing any possible resources occupied by the underlying stream. After invocation, some entries may still be available for reading.

      Implementations must always release resources when a stream is completed, either with an error or normally. Therefore, it is only required to close() a stream if the consumer decides to not read until the end.

      Specified by:
      close in interface MessageStream<M extends Message>
    • isClosed

      public boolean isClosed()
      Indicates whether this stream has been closed, either by completion or by explicit closing from the consumer.

      Unlike isCompleted(), this may also return true when there are still messages to consume

      Returns:
      true when closed or completed, otherwise false
    • peek

      public Optional<MessageStream.Entry<M>> peek()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      peek in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.