Class AbstractMessageStream<M extends Message>

java.lang.Object
org.axonframework.messaging.core.AbstractMessageStream<M>
Type Parameters:
M - the type of Message contained in the entries of this stream
All Implemented Interfaces:
MessageStream<M>
Direct Known Subclasses:
AbstractQueryResponseMessageStream, CloseCallbackMessageStream, ContinuousMessageStream, QueueMessageStream

@Internal public abstract class AbstractMessageStream<M extends Message> extends Object implements MessageStream<M>
Abstract base implementation of MessageStream that provides full state management for consumption, completion, error propagation, closing, and callback coordination.

This implementation enforces a non-blocking, pull-based consumption model with optional callback-driven signaling. Consumers invoke next() to attempt to retrieve entries. When no entry is available at that time, the stream may enter an awaiting data state. In this state, the stream expects an external signal indicating that progress may be possible again.

Implementations must provide data through fetchNext(), which returns a AbstractMessageStream.FetchResult describing the current availability:

Returning AbstractMessageStream.FetchResult.NotReady transitions the stream into an awaiting data state. Implementations are expected to invoke signalProgress() when new data may be available, or when the stream completes or fails asynchronously. This will trigger the registered callback if the consumer is awaiting data. It is safe to invoke signalProgress() even if the stream is not awaiting data; such calls are ignored.

The registered callback (see setCallback(Runnable)) is invoked when:

  • entries may be available for consumption,
  • the stream completes (normally or exceptionally)
Invocation of the callback does not guarantee that entries are available; consumers must always re-check the stream using next(), peek(), isCompleted(), and error().

This class enforces a strict non-blocking contract: fetchNext() must never block. If no data is currently available, implementations must return AbstractMessageStream.FetchResult.NotReady and rely on signalProgress() to indicate future availability.

Stream completion is terminal. Once a AbstractMessageStream.FetchResult.Completed or AbstractMessageStream.FetchResult.Error is returned, the stream will not produce further entries. Implementations must signal completion exclusively via fetchNext(); they should not invoke close() directly unless acting as a consumer of another (sub)stream they own.

If the registered callback throws an exception, the stream is completed exceptionally with that error, unless the callback was called to signal completion. This ensures that failures in progress signaling do not leave the stream in an unusable state.

Implementations may optionally initialize the stream state during construction using initialize(FetchResult). If not invoked explicitly, the stream is implicitly initialized on first interaction with a AbstractMessageStream.FetchResult.NotReady state. Initialization may only occur once and cannot use AbstractMessageStream.FetchResult.Value.

This class is thread-safe; all state transitions are guarded to support concurrent interaction between producers and consumers.

Subclasses may override onCompleted() to perform cleanup when the stream reaches a terminal state.

All methods in this class are either final, private, abstract or empty to protect its invariants.

Since:
5.1.0
Author:
Jan Galinski, John Hendrikx
  • Constructor Details

    • AbstractMessageStream

      public AbstractMessageStream()
  • Method Details

    • initialize

      protected final void initialize(AbstractMessageStream.FetchResult<MessageStream.Entry<M>> initialFetchResult)
      This method can be used after the super constructor call completes in a subtype to set the initial state of the stream. It may only be called during construction, and will throw an exception if the stream already has a valid state.
      Parameters:
      initialFetchResult - the initial fetch result; must not be null and must not be a Value
      Throws:
      NullPointerException - if initialFetchResult is null
      IllegalArgumentException - if initialFetchResult is a Value
      IllegalStateException - if already initialized
    • setCallback

      public final void setCallback(Runnable callback)
      Description copied from interface: MessageStream
      Registers the callback to invoke when entries are available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.

      This has the following implications:

      • When you register the callback on an already completed stream, the callback is invoked directly.
      • When you register the callback on a stream having entries available, the callback is invoked, and you must consume the existing entries.
      • A registered callback is invoked again when all entries were consumed and new entries arrive.
      • Depending on the implementation of the stream, your callback might be invoked again while you are still consuming entries, in these cases it is your responsibility to synchronize the callback executions. Using MessageStream.reduce(Object, BiFunction) might be a better fit because it guarantees isolated execution of callbacks using a processingGate.

      Note that an invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.

      Any previously registered callback is replaced with the given callback.

      The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error, unless the callback was called to indicate completion.

      Specified by:
      setCallback in interface MessageStream<M extends Message>
      Parameters:
      callback - The callback to invoke when entries are available for reading, or the stream completes.
      See Also:
    • close

      public final void close()
      Closes this stream, indicating that the consumer is no longer interested in receiving further entries.

      This is a consumer-driven cancellation operation. It immediately transitions the stream into a terminal state and discards any remaining or buffered entries, including any peeked entries. After this call returns:

      This method is intended exclusively for consumer-side cancellation. It signals that no further processing is required and allows implementations to release all resources immediately.

      This operation is fundamentally different from producer-side completion. When an implementation determines that no further elements will ever become available, it must not call close(). Instead, it must:

      • stop producing new elements,
      • allow any buffered elements to be consumed normally, and
      • transition the stream to completion using its normal terminal signaling mechanism (e.g. completion or error through next()/fetchNext()).

      Calling this method is only required when the consumer chooses not to fully consume the stream. Streams must always release resources when they complete, regardless of whether close() is invoked.

      This method is intended to be invoked by consumers to indicate loss of interest. Implementations should not call this method directly, unless they are acting as a consumer of another (sub)stream they own.

      Specified by:
      close in interface MessageStream<M extends Message>
    • signalProgress

      protected final void signalProgress()
      Signals that the stream may have made progress.

      This method should be invoked by implementations when an external event occurs that may allow the consumer to make progress. This includes, but is not limited to:

      • new entries becoming available,
      • the stream completing normally, or
      • the stream failing with an error.

      If the stream is currently awaiting data (i.e., a previous call to next() returned no entry due to AbstractMessageStream.FetchResult.NotReady), this method invokes the registered callback. Otherwise, this method has no effect.

      This method may be invoked even if the stream is not currently awaiting data; in that case, the call has no effect.

      Ordering requirement

      Implementations must ensure that any state changes that make progress observable via fetchNext() are fully applied before invoking this method.

      In other words, signalProgress() must only be called after the stream's internal state has been updated in a way that a subsequent fetchNext() call can observe.

      Failing to observe this ordering may result in a lost wake-up scenario where:

      even though data is available.

      This method is therefore strictly a notification mechanism, not a state publication mechanism. Correctness must come from state visibility in fetchNext(), not from the timing of this signal.

    • error

      public final Optional<Throwable> error()
      Description copied from interface: MessageStream
      Indicates whether any error has been reported in this stream. Implementations may choose to not return any error here until all entries that were available for reading before any error occurred have been consumed.
      Specified by:
      error in interface MessageStream<M extends Message>
      Returns:
      An optional containing the possible error this stream completed with.
    • isCompleted

      public final boolean isCompleted()
      Description copied from interface: MessageStream
      Indicates whether this stream has been completed. A completed stream will never return entries from MessageStream.next(), and MessageStream.hasNextAvailable() will always return false. If the stream completed with an error, MessageStream.error() will report so.
      Specified by:
      isCompleted in interface MessageStream<M extends Message>
      Returns:
      true if the stream completed, otherwise false.
    • hasNextAvailable

      public final boolean hasNextAvailable()
      Description copied from interface: MessageStream
      Indicates whether an entry is available for immediate reading. When entries are reported available, there is no guarantee that MessageStream.next() will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returns false.
      Specified by:
      hasNextAvailable in interface MessageStream<M extends Message>
      Returns:
      true when there are entries available for reading, false otherwise.
    • peek

      public final Optional<MessageStream.Entry<M>> peek()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      peek in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.
    • next

      public final Optional<MessageStream.Entry<M>> next()
      Description copied from interface: MessageStream
      Returns an Optional carrying the next entry from the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Specified by:
      next in interface MessageStream<M extends Message>
      Returns:
      An optional carrying the next entry, if available.
    • fetchNext

      protected abstract AbstractMessageStream.FetchResult<MessageStream.Entry<M>> fetchNext()
      Attempts to fetch the next available MessageStream.Entry from the underlying source.

      This method is invoked by next() when no previously peeked entry is available. Implementations must return a AbstractMessageStream.FetchResult describing the current state of the stream:

      Returning AbstractMessageStream.FetchResult.NotReady will transition the stream into an awaiting data state. Implementations must subsequently invoke signalProgress() when progress may be possible again (e.g., when new data arrives or the stream completes).

      Implementations must ensure that any state changes observable via this method are fully applied before invoking signalProgress(). A signal that arrives before the consumer has entered the awaiting state is not replayed; correctness relies on this method returning the updated state when the consumer calls it next. See signalProgress() for the full ordering contract.

      This method must be non-blocking. It should return immediately with the best available information about the stream's current state.

      Implementations must not attempt to complete or close the stream directly. Instead, they must return AbstractMessageStream.FetchResult.Completed or AbstractMessageStream.FetchResult.Error to signal termination.

      If an implementation throws an exception, the stream completes exceptionally with that exception.

      Returns:
      a AbstractMessageStream.FetchResult representing the outcome of the fetch attempt
    • onCompleted

      protected void onCompleted()
      Callback invoked when the stream is about to transition to a completed state, either successfully or exceptionally. Subclasses may override this method to perform custom actions on completion.

      If the implementation throws an exception, the stream still completes, but it will complete with the thrown exception. If the stream was about to complete with an error, and the callback fails as well, the exception is added as a suppressed exception.

    • toString

      public String toString()
      Returns a structured diagnostic representation of this MessageStream.

      The output is designed for debugging complex stream compositions and focuses on three orthogonal aspects:

      • Lifecycle status (terminal or transitional state)
      • Transient flags (abnormal or noteworthy conditions)
      • Delegation structure (wrapped or composed streams)

      Format

      The general structure is:
       SimpleName[status|P|flags]{delegates}
       

      Status section

      The status reflects the current terminal or transitional condition:
      • ERROR – the stream completed exceptionally
      • COMPLETED – the stream completed normally
      • NOT_READY – the stream is awaiting data
      • absent – stream is in its normal active state

      Additional markers

      • P – indicates a peeked entry is currently buffered
      • describeFlags() – optional subtype-specific diagnostic flags separated by "|", only used for abnormal or noteworthy conditions

      Flags

      Flags are intended for rare or abnormal conditions only, not normal operating state. They should be short, human-readable identifiers separated by "|". If no flags are present, this section is omitted.

      Delegates

      Delegates describe wrapped or composed streams that this stream builds upon. Multiple delegates are comma-separated. The currently active delegate must be prefixed with "*".

      This allows reconstruction of the stream pipeline structure for debugging purposes.

      Overrides:
      toString in class Object
      Returns:
      a structured diagnostic string representing this stream and its composition
    • describeFlags

      protected @Nullable String describeFlags()
      Subtypes should override this to return flags that apply to this stream for debugging purposes. Flags should be short. Multiple flags should be separated with a pipe ("|").

      Only include flags for abnormal states, to reduce visual noise.

      Returns:
      the flags that apply to this stream, or null if there are no relevant flags
    • describeDelegates

      protected @Nullable String describeDelegates()
      Subtypes should override this to describe any (message stream) delegates they use for debugging purposes. This allows to visualize a chain of message streams, their states and how they are linked.

      If there are multiple delegates, then they should be comma separated with the active delegate prepended with an asterisk ("*").

      Returns:
      the description of delegate streams, or null if there are no delegates