Class QueueMessageStream<M extends Message>

java.lang.Object
org.axonframework.messaging.core.AbstractMessageStream<M>
org.axonframework.messaging.core.QueueMessageStream<M>
Type Parameters:
M - The type of Message contained in this stream.
All Implemented Interfaces:
MessageStream<M>

public class QueueMessageStream<M extends Message> extends AbstractMessageStream<M>
A MessageStream implementation backed by a BlockingQueue.

This stream acts as a bridge between a producer and a consumer: entries are offered into an internal queue and consumed via the MessageStream API.

The stream supports both finite and dynamically produced sequences:

  • While open, the stream may temporarily have no elements available, in which case consumption methods may indicate a "not ready" state.
  • Once seal() or sealExceptionally(Throwable) is invoked, no further elements are accepted. Remaining buffered elements can still be consumed.
  • After the queue is drained, the stream completes normally or exceptionally depending on how it was sealed.

If a callback is registered via AbstractMessageStream.setCallback(Runnable), it is invoked when new elements become available. If the callback throws an exception, the stream transitions to an error state immediately and any buffered elements are discarded.

Since:
5.0.0
Author:
Allard Buijze, John Hendrikx
  • Constructor Details

    • QueueMessageStream

      public QueueMessageStream()
      Constructs a QueueMessageStream backed by an unbounded queue.

      Offering elements will succeed as long as sufficient memory is available.

    • QueueMessageStream

      public QueueMessageStream(BlockingQueue<MessageStream.Entry<M>> queue)
      Constructs a QueueMessageStream using the given queue as its underlying buffer.

      Both production and consumption semantics depend on the provided queue implementation. Elements are added using BlockingQueue.offer(Object) and consumed using Queue.poll().

      The queue must support buffering of elements. Implementations such as TransferQueue that rely on direct handoff without internal storage are not suitable.

      Parameters:
      queue - The queue used to buffer entries between producer and consumer.
  • Method Details

    • fetchNext

      Description copied from class: AbstractMessageStream
      Attempts to fetch the next available MessageStream.Entry from the underlying source.

      This method is invoked by AbstractMessageStream.next() when no previously peeked entry is available. Implementations must return a AbstractMessageStream.FetchResult describing the current state of the stream:

      Returning AbstractMessageStream.FetchResult.NotReady will transition the stream into an awaiting data state. Implementations must subsequently invoke AbstractMessageStream.signalProgress() when progress may be possible again (e.g., when new data arrives or the stream completes).

      Implementations must ensure that any state changes observable via this method are fully applied before invoking AbstractMessageStream.signalProgress(). A signal that arrives before the consumer has entered the awaiting state is not replayed; correctness relies on this method returning the updated state when the consumer calls it next. See AbstractMessageStream.signalProgress() for the full ordering contract.

      This method must be non-blocking. It should return immediately with the best available information about the stream's current state.

      Implementations must not attempt to complete or close the stream directly. Instead, they must return AbstractMessageStream.FetchResult.Completed or AbstractMessageStream.FetchResult.Error to signal termination.

      If an implementation throws an exception, the stream completes exceptionally with that exception.

      Specified by:
      fetchNext in class AbstractMessageStream<M extends Message>
      Returns:
      a AbstractMessageStream.FetchResult representing the outcome of the fetch attempt
    • offer

      public boolean offer(M message, Context context)
      Attempts to add the given message and context to this stream.

      If successful, the element becomes available for consumption and any registered callback is invoked to signal its availability.

      If the callback throws an exception, the stream transitions to an error state immediately. In that case, any buffered elements are discarded and further interaction with the stream will reflect the error.

      This method returns false if:

      • the stream has been sealed or sealed exceptionally,
      • the underlying queue cannot accept the element (e.g. bounded queue is full), or
      • the stream has been closed by the consumer.
      Parameters:
      message - the message to add
      context - the context associated with the message
      Returns:
      true if the element was accepted, otherwise false.
    • seal

      public void seal()
      Seals this queue, preventing any further elements from being added. The queue may still contain elements; once these have been consumed, the stream completes.

      Any Messages offered after sealing will be rejected.

    • sealExceptionally

      public void sealExceptionally(Throwable error)
      Seals this queue exceptionally, indicating that no further elements will be added and that an error has occurred during production.

      Any Messages offered after this method is invoked will be rejected.

      Already buffered elements may still be consumed via AbstractMessageStream.next() or AbstractMessageStream.peek(). Once the queue is empty, the stream will complete and AbstractMessageStream.error() will report the provided Throwable.

      Parameters:
      error - the Throwable representing the error that caused the stream to fail
    • onCompleted

      protected final void onCompleted()
      Description copied from class: AbstractMessageStream
      Callback invoked when the stream is about to transition to a completed state, either successfully or exceptionally. Subclasses may override this method to perform custom actions on completion.

      If the implementation throws an exception, the stream still completes, but it will complete with the thrown exception. If the stream was about to complete with an error, and the callback fails as well, the exception is added as a suppressed exception.

      Overrides:
      onCompleted in class AbstractMessageStream<M extends Message>