Class AbstractMessageStream<M extends Message>
- All Implemented Interfaces:
MessageStream<M>
- Direct Known Subclasses:
AbstractQueryResponseMessageStream,CloseCallbackMessageStream,ContinuousMessageStream,QueueMessageStream
MessageStream that provides full state management for
consumption, completion, error propagation, closing, and callback coordination.
This implementation enforces a non-blocking, pull-based consumption model with optional
callback-driven signaling. Consumers invoke next() to attempt to retrieve entries.
When no entry is available at that time, the stream may enter an awaiting data state.
In this state, the stream expects an external signal indicating that progress may be possible again.
Implementations must provide data through fetchNext(), which returns a
AbstractMessageStream.FetchResult describing the current availability:
AbstractMessageStream.FetchResult.Value- an entry is availableAbstractMessageStream.FetchResult.NotReady- no entry is currently available, but more may arriveAbstractMessageStream.FetchResult.Completed- the stream is exhausted and will produce no further entriesAbstractMessageStream.FetchResult.Error- the stream has failed with an error
Returning AbstractMessageStream.FetchResult.NotReady transitions the stream into an awaiting data state.
Implementations are expected to invoke signalProgress() when new data may be available,
or when the stream completes or fails asynchronously. This will trigger the registered callback
if the consumer is awaiting data. It is safe to invoke signalProgress() even if the stream
is not awaiting data; such calls are ignored.
The registered callback (see setCallback(Runnable)) is invoked when:
- entries may be available for consumption,
- the stream completes (normally or exceptionally)
next(), peek(), isCompleted(),
and error().
This class enforces a strict non-blocking contract: fetchNext() must never block.
If no data is currently available, implementations must return AbstractMessageStream.FetchResult.NotReady
and rely on signalProgress() to indicate future availability.
Stream completion is terminal. Once a AbstractMessageStream.FetchResult.Completed or AbstractMessageStream.FetchResult.Error
is returned, the stream will not produce further entries. Implementations must signal completion
exclusively via fetchNext(); they should not invoke close() directly unless
acting as a consumer of another (sub)stream they own.
If the registered callback throws an exception, the stream is completed exceptionally with that error, unless the callback was called to signal completion. This ensures that failures in progress signaling do not leave the stream in an unusable state.
Implementations may optionally initialize the stream state during construction using
initialize(FetchResult). If not invoked explicitly, the stream is implicitly initialized
on first interaction with a AbstractMessageStream.FetchResult.NotReady state. Initialization may only occur once
and cannot use AbstractMessageStream.FetchResult.Value.
This class is thread-safe; all state transitions are guarded to support concurrent interaction between producers and consumers.
Subclasses may override onCompleted() to perform cleanup when the stream reaches a
terminal state.
All methods in this class are either final, private, abstract or empty to
protect its invariants.
- Since:
- 5.1.0
- Author:
- Jan Galinski, John Hendrikx
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceAbstractMessageStream.FetchResult<T extends @Nullable MessageStream.Entry<?>>Represents the result of attempting to fetch the next element from aMessageStream.Nested classes/interfaces inherited from interface org.axonframework.messaging.core.MessageStream
MessageStream.Empty<M extends Message>, MessageStream.Entry<M extends Message>, MessageStream.Single<M extends Message> -
Constructor Summary
Constructors -
Method Summary
Modifier and TypeMethodDescriptionfinal voidclose()Closes this stream, indicating that the consumer is no longer interested in receiving further entries.protected @Nullable StringSubtypes should override this to describe any (message stream) delegates they use for debugging purposes.protected @Nullable StringSubtypes should override this to return flags that apply to this stream for debugging purposes.error()Indicates whether any error has been reported in this stream.protected abstract AbstractMessageStream.FetchResult<MessageStream.Entry<M>> Attempts to fetch the next availableMessageStream.Entryfrom the underlying source.final booleanIndicates whether anentryis available for immediate reading.protected final voidinitialize(AbstractMessageStream.FetchResult<MessageStream.Entry<M>> initialFetchResult) This method can be used after the super constructor call completes in a subtype to set the initial state of the stream.final booleanIndicates whether this stream has been completed.final Optional<MessageStream.Entry<M>> next()Returns an Optional carrying the nextentryfrom the stream, if such entry was available.protected voidCallback invoked when the stream is about to transition to a completed state, either successfully or exceptionally.final Optional<MessageStream.Entry<M>> peek()Returns an Optional carrying the nextentryfrom the stream (without moving the stream pointer), if such entry was available.final voidsetCallback(Runnable callback) Registers the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.protected final voidSignals that the stream may have made progress.toString()Returns a structured diagnostic representation of thisMessageStream.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.axonframework.messaging.core.MessageStream
cast, concatWith, filter, first, ignoreEntries, map, mapMessage, onClose, onComplete, onErrorContinue, onNext, reduce
-
Constructor Details
-
AbstractMessageStream
public AbstractMessageStream()
-
-
Method Details
-
initialize
protected final void initialize(AbstractMessageStream.FetchResult<MessageStream.Entry<M>> initialFetchResult) This method can be used after the super constructor call completes in a subtype to set the initial state of the stream. It may only be called during construction, and will throw an exception if the stream already has a valid state.- Parameters:
initialFetchResult- the initial fetch result; must not be null and must not be a Value- Throws:
NullPointerException- ifinitialFetchResultis nullIllegalArgumentException- ifinitialFetchResultis a ValueIllegalStateException- if already initialized
-
setCallback
Description copied from interface:MessageStreamRegisters the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.This has the following implications:
- When you register the callback on an already completed stream, the callback is invoked directly.
- When you register the callback on a stream having entries available, the callback is invoked, and you must consume the existing entries.
- A registered callback is invoked again when all entries were consumed and new entries arrive.
- Depending on the implementation of the stream, your callback might be invoked again while you
are still consuming entries, in these cases it is your responsibility to synchronize the callback executions.
Using
MessageStream.reduce(Object, BiFunction)might be a better fit because it guarantees isolated execution of callbacks using aprocessingGate.
Note that an invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.
Any previously registered callback is replaced with the given
callback.The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error, unless the callback was called to indicate completion.
- Specified by:
setCallbackin interfaceMessageStream<M extends Message>- Parameters:
callback- The callback to invoke whenentriesare available for reading, or the stream completes.- See Also:
-
close
public final void close()Closes this stream, indicating that the consumer is no longer interested in receiving further entries.This is a consumer-driven cancellation operation. It immediately transitions the stream into a terminal state and discards any remaining or buffered entries, including any peeked entries. After this call returns:
MessageStream.next()will not return further entriesMessageStream.peek()will returnOptional.empty()MessageStream.hasNextAvailable()will returnfalse
This method is intended exclusively for consumer-side cancellation. It signals that no further processing is required and allows implementations to release all resources immediately.
This operation is fundamentally different from producer-side completion. When an implementation determines that no further elements will ever become available, it must not call
close(). Instead, it must:- stop producing new elements,
- allow any buffered elements to be consumed normally, and
- transition the stream to completion using its normal terminal signaling mechanism
(e.g. completion or error through
next()/fetchNext()).
Calling this method is only required when the consumer chooses not to fully consume the stream. Streams must always release resources when they complete, regardless of whether
close()is invoked.This method is intended to be invoked by consumers to indicate loss of interest. Implementations should not call this method directly, unless they are acting as a consumer of another (sub)stream they own.
- Specified by:
closein interfaceMessageStream<M extends Message>
-
signalProgress
protected final void signalProgress()Signals that the stream may have made progress.This method should be invoked by implementations when an external event occurs that may allow the consumer to make progress. This includes, but is not limited to:
- new entries becoming available,
- the stream completing normally, or
- the stream failing with an error.
If the stream is currently awaiting data (i.e., a previous call to
next()returned no entry due toAbstractMessageStream.FetchResult.NotReady), this method invokes the registered callback. Otherwise, this method has no effect.This method may be invoked even if the stream is not currently awaiting data; in that case, the call has no effect.
Ordering requirement
Implementations must ensure that any state changes that make progress observable viafetchNext()are fully applied before invoking this method.In other words,
signalProgress()must only be called after the stream's internal state has been updated in a way that a subsequentfetchNext()call can observe.Failing to observe this ordering may result in a lost wake-up scenario where:
- progress is signalled, but
- the consumer observes no available element and returns
AbstractMessageStream.FetchResult.NotReady
This method is therefore strictly a notification mechanism, not a state publication mechanism. Correctness must come from state visibility in
fetchNext(), not from the timing of this signal. -
error
Description copied from interface:MessageStreamIndicates whether any error has been reported in this stream. Implementations may choose to not return any error here until allentriesthat were available for reading before any error occurred have been consumed.- Specified by:
errorin interfaceMessageStream<M extends Message>- Returns:
- An optional containing the possible error this stream completed with.
-
isCompleted
public final boolean isCompleted()Description copied from interface:MessageStreamIndicates whether this stream has been completed. A completed stream will never returnentriesfromMessageStream.next(), andMessageStream.hasNextAvailable()will always returnfalse. If the stream completed with an error,MessageStream.error()will report so.- Specified by:
isCompletedin interfaceMessageStream<M extends Message>- Returns:
trueif the stream completed, otherwisefalse.
-
hasNextAvailable
public final boolean hasNextAvailable()Description copied from interface:MessageStreamIndicates whether anentryis available for immediate reading. When entries are reported available, there is no guarantee thatMessageStream.next()will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returnsfalse.- Specified by:
hasNextAvailablein interfaceMessageStream<M extends Message>- Returns:
truewhen there areentriesavailable for reading,falseotherwise.
-
peek
Description copied from interface:MessageStreamReturns an Optional carrying the nextentryfrom the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.This method will never block for elements becoming available.
- Specified by:
peekin interfaceMessageStream<M extends Message>- Returns:
- An optional carrying the next
entry, if available.
-
next
Description copied from interface:MessageStreamReturns an Optional carrying the nextentryfrom the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.This method will never block for elements becoming available.
- Specified by:
nextin interfaceMessageStream<M extends Message>- Returns:
- An optional carrying the next
entry, if available.
-
fetchNext
Attempts to fetch the next availableMessageStream.Entryfrom the underlying source.This method is invoked by
next()when no previously peeked entry is available. Implementations must return aAbstractMessageStream.FetchResultdescribing the current state of the stream:AbstractMessageStream.FetchResult.Valueif an entry is immediately available,AbstractMessageStream.FetchResult.NotReadyif no entry is currently available but more may arrive later,AbstractMessageStream.FetchResult.Completedif the stream is exhausted and will produce no further entries,AbstractMessageStream.FetchResult.Errorif the stream has failed with an error.
Returning
AbstractMessageStream.FetchResult.NotReadywill transition the stream into an awaiting data state. Implementations must subsequently invokesignalProgress()when progress may be possible again (e.g., when new data arrives or the stream completes).Implementations must ensure that any state changes observable via this method are fully applied before invoking
signalProgress(). A signal that arrives before the consumer has entered the awaiting state is not replayed; correctness relies on this method returning the updated state when the consumer calls it next. SeesignalProgress()for the full ordering contract.This method must be non-blocking. It should return immediately with the best available information about the stream's current state.
Implementations must not attempt to complete or close the stream directly. Instead, they must return
AbstractMessageStream.FetchResult.CompletedorAbstractMessageStream.FetchResult.Errorto signal termination.If an implementation throws an exception, the stream completes exceptionally with that exception.
- Returns:
- a
AbstractMessageStream.FetchResultrepresenting the outcome of the fetch attempt
-
onCompleted
protected void onCompleted()Callback invoked when the stream is about to transition to a completed state, either successfully or exceptionally. Subclasses may override this method to perform custom actions on completion.If the implementation throws an exception, the stream still completes, but it will complete with the thrown exception. If the stream was about to complete with an error, and the callback fails as well, the exception is added as a suppressed exception.
-
toString
Returns a structured diagnostic representation of thisMessageStream.The output is designed for debugging complex stream compositions and focuses on three orthogonal aspects:
- Lifecycle status (terminal or transitional state)
- Transient flags (abnormal or noteworthy conditions)
- Delegation structure (wrapped or composed streams)
Format
The general structure is:SimpleName[status|P|flags]{delegates}Status section
The status reflects the current terminal or transitional condition:ERROR– the stream completed exceptionallyCOMPLETED– the stream completed normallyNOT_READY– the stream is awaiting data- absent – stream is in its normal active state
Additional markers
P– indicates a peeked entry is currently buffereddescribeFlags()– optional subtype-specific diagnostic flags separated by"|", only used for abnormal or noteworthy conditions
Flags
Flags are intended for rare or abnormal conditions only, not normal operating state. They should be short, human-readable identifiers separated by"|". If no flags are present, this section is omitted.Delegates
Delegates describe wrapped or composed streams that this stream builds upon. Multiple delegates are comma-separated. The currently active delegate must be prefixed with"*".This allows reconstruction of the stream pipeline structure for debugging purposes.
-
describeFlags
Subtypes should override this to return flags that apply to this stream for debugging purposes. Flags should be short. Multiple flags should be separated with a pipe ("|").Only include flags for abnormal states, to reduce visual noise.
- Returns:
- the flags that apply to this stream, or
nullif there are no relevant flags
-
describeDelegates
Subtypes should override this to describe any (message stream) delegates they use for debugging purposes. This allows to visualize a chain of message streams, their states and how they are linked.If there are multiple delegates, then they should be comma separated with the active delegate prepended with an asterisk ("*").
- Returns:
- the description of delegate streams, or
nullif there are no delegates
-