Class MergedMessageStream<M extends Message>

java.lang.Object
org.axonframework.messaging.core.MergedMessageStream<M>
Type Parameters:
M - The type of Message in the stream
All Implemented Interfaces:
MessageStream<M>

public class MergedMessageStream<M extends Message> extends Object implements MessageStream<M>
A MessageStream implementation that merges two underlying message streams into a single stream. Messages from both streams are interleaved based on a provided Comparator, which determines the order in which messages are consumed.

The merged stream is considered completed only when both underlying streams are completed. If either stream has an error, that error is propagated through the merged stream, with errors from the first stream taking precedence. However, when one of the underlying streams propagates an error, this stream can still be consumed from as long as any other underlying streams provide messages. Clients that wish to abort consuming messages when there is an error can close() the stream once error() returns a non-empty Optional.

When a message is consumed via next(), the implementation peeks at the head of both streams and uses the comparator to decide which message to consume. This ensures messages are returned in the desired order without requiring either stream to be fully buffered.

Since:
5.1.0
Author:
Allard Buijze
  • Constructor Details

    • MergedMessageStream

      public MergedMessageStream(Comparator<MessageStream.Entry<M>> comparator, MessageStream<M> first, MessageStream<M> second)
      Constructs a MergedMessageStream that merges two message streams based on the given comparator.
      Parameters:
      comparator - The comparator used to determine the order in which messages from the two streams are consumed. A result of <= 0 means the message from the first stream is consumed first.
      first - The first message stream to merge.
      second - The second message stream to merge.
  • Method Details

    • next

      public Optional<MessageStream.Entry<M>> next()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      next in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.
    • peek

      public Optional<MessageStream.Entry<M>> peek()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      peek in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.
    • setCallback

      public void setCallback(Runnable callback)
      Description copied from interface: MessageStream
      Registers the callback to invoke when entries are available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.

      This has the following implications:

      • When you register the callback on an already completed stream, the callback is invoked directly.
      • When you register the callback on a stream having entries available, the callback is invoked, and you must consume the existing entries.
      • A registered callback is invoked again when all entries were consumed and new entries arrive.
      • Depending on the implementation of the stream, your callback might be invoked again while you are still consuming entries, in these cases it is your responsibility to synchronize the callback executions. Using MessageStream.reduce(Object, BiFunction) might be a better fit because it guarantees isolated execution of callbacks using a processingGate.

      Note that an invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.

      Any previously registered callback is replaced with the given callback.

      The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error, unless the callback was called to indicate completion.

      Specified by:
      setCallback in interface MessageStream<M extends Message>
      Parameters:
      callback - The callback to invoke when entries are available for reading, or the stream completes.
      See Also:
    • error

      public Optional<Throwable> error()
      Description copied from interface: MessageStream
      Indicates whether any error has been reported in this stream. Implementations may choose to not return any error here until all entries that were available for reading before any error occurred have been consumed.
      Specified by:
      error in interface MessageStream<M extends Message>
      Returns:
      An optional containing the possible error this stream completed with.
    • isCompleted

      public boolean isCompleted()
      Description copied from interface: MessageStream
      Indicates whether this stream has been completed. A completed stream will never return entries from MessageStream.next(), and MessageStream.hasNextAvailable() will always return false. If the stream completed with an error, MessageStream.error() will report so.
      Specified by:
      isCompleted in interface MessageStream<M extends Message>
      Returns:
      true if the stream completed, otherwise false.
    • hasNextAvailable

      public boolean hasNextAvailable()
      Description copied from interface: MessageStream
      Indicates whether an entry is available for immediate reading. When entries are reported available, there is no guarantee that MessageStream.next() will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returns false.
      Specified by:
      hasNextAvailable in interface MessageStream<M extends Message>
      Returns:
      true when there are entries available for reading, false otherwise.
    • close

      public void close()
      Description copied from interface: MessageStream
      Closes this stream, indicating that the consumer is no longer interested in receiving further entries.

      This is a consumer-driven cancellation operation. It immediately transitions the stream into a terminal state and discards any remaining or buffered entries, including any peeked entries. After this call returns:

      This method is intended exclusively for consumer-side cancellation. It signals that no further processing is required and allows implementations to release all resources immediately.

      This operation is fundamentally different from producer-side completion. When an implementation determines that no further elements will ever become available, it must not call close(). Instead, it must:

      • stop producing new elements,
      • allow any buffered elements to be consumed normally, and
      • transition the stream to completion using its normal terminal signaling mechanism (e.g. completion or error through next()/fetchNext()).

      Calling this method is only required when the consumer chooses not to fully consume the stream. Streams must always release resources when they complete, regardless of whether close() is invoked.

      Specified by:
      close in interface MessageStream<M extends Message>