Class QueueMessageStream<M extends Message>
- Type Parameters:
M- The type ofMessagecontained in this stream.
- All Implemented Interfaces:
MessageStream<M>
MessageStream implementation backed by a BlockingQueue.
This stream acts as a bridge between a producer and a consumer:
entries are offered into an
internal queue and consumed via the MessageStream API.
The stream supports both finite and dynamically produced sequences:
- While open, the stream may temporarily have no elements available, in which case consumption methods may indicate a "not ready" state.
- Once
seal()orsealExceptionally(Throwable)is invoked, no further elements are accepted. Remaining buffered elements can still be consumed. - After the queue is drained, the stream completes normally or exceptionally depending on how it was sealed.
If a callback is registered via AbstractMessageStream.setCallback(Runnable), it is invoked
when new elements become available. If the callback throws an exception, the
stream transitions to an error state immediately and any buffered elements
are discarded.
- Since:
- 5.0.0
- Author:
- Allard Buijze, John Hendrikx
-
Nested Class Summary
Nested classes/interfaces inherited from class org.axonframework.messaging.core.AbstractMessageStream
AbstractMessageStream.FetchResult<T extends @Nullable MessageStream.Entry<?>>Nested classes/interfaces inherited from interface org.axonframework.messaging.core.MessageStream
MessageStream.Empty<M extends Message>, MessageStream.Entry<M extends Message>, MessageStream.Single<M extends Message> -
Constructor Summary
ConstructorsConstructorDescriptionConstructs aQueueMessageStreambacked by an unbounded queue.Constructs aQueueMessageStreamusing the givenqueueas its underlying buffer. -
Method Summary
Modifier and TypeMethodDescriptionprotected AbstractMessageStream.FetchResult<MessageStream.Entry<M>> Attempts to fetch the next availableMessageStream.Entryfrom the underlying source.booleanAttempts to add the givenmessageandcontextto this stream.protected final voidCallback invoked when the stream is about to transition to a completed state, either successfully or exceptionally.voidseal()Seals this queue, preventing any further elements from being added.voidsealExceptionally(Throwable error) Seals this queue exceptionally, indicating that no further elements will be added and that an error has occurred during production.Methods inherited from class org.axonframework.messaging.core.AbstractMessageStream
close, describeDelegates, describeFlags, error, hasNextAvailable, initialize, isCompleted, next, peek, setCallback, signalProgress, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.axonframework.messaging.core.MessageStream
cast, concatWith, filter, first, ignoreEntries, map, mapMessage, onClose, onComplete, onErrorContinue, onNext, reduce
-
Constructor Details
-
QueueMessageStream
public QueueMessageStream()Constructs aQueueMessageStreambacked by an unbounded queue.Offering elements will succeed as long as sufficient memory is available.
-
QueueMessageStream
Constructs aQueueMessageStreamusing the givenqueueas its underlying buffer.Both production and consumption semantics depend on the provided queue implementation. Elements are added using
BlockingQueue.offer(Object)and consumed usingQueue.poll().The queue must support buffering of elements. Implementations such as
TransferQueuethat rely on direct handoff without internal storage are not suitable.- Parameters:
queue- The queue used to bufferentriesbetween producer and consumer.
-
-
Method Details
-
fetchNext
Description copied from class:AbstractMessageStreamAttempts to fetch the next availableMessageStream.Entryfrom the underlying source.This method is invoked by
AbstractMessageStream.next()when no previously peeked entry is available. Implementations must return aAbstractMessageStream.FetchResultdescribing the current state of the stream:AbstractMessageStream.FetchResult.Valueif an entry is immediately available,AbstractMessageStream.FetchResult.NotReadyif no entry is currently available but more may arrive later,AbstractMessageStream.FetchResult.Completedif the stream is exhausted and will produce no further entries,AbstractMessageStream.FetchResult.Errorif the stream has failed with an error.
Returning
AbstractMessageStream.FetchResult.NotReadywill transition the stream into an awaiting data state. Implementations must subsequently invokeAbstractMessageStream.signalProgress()when progress may be possible again (e.g., when new data arrives or the stream completes).Implementations must ensure that any state changes observable via this method are fully applied before invoking
AbstractMessageStream.signalProgress(). A signal that arrives before the consumer has entered the awaiting state is not replayed; correctness relies on this method returning the updated state when the consumer calls it next. SeeAbstractMessageStream.signalProgress()for the full ordering contract.This method must be non-blocking. It should return immediately with the best available information about the stream's current state.
Implementations must not attempt to complete or close the stream directly. Instead, they must return
AbstractMessageStream.FetchResult.CompletedorAbstractMessageStream.FetchResult.Errorto signal termination.If an implementation throws an exception, the stream completes exceptionally with that exception.
- Specified by:
fetchNextin classAbstractMessageStream<M extends Message>- Returns:
- a
AbstractMessageStream.FetchResultrepresenting the outcome of the fetch attempt
-
offer
Attempts to add the givenmessageandcontextto this stream.If successful, the element becomes available for consumption and any registered callback is invoked to signal its availability.
If the callback throws an exception, the stream transitions to an error state immediately. In that case, any buffered elements are discarded and further interaction with the stream will reflect the error.
This method returns
falseif:- the stream has been
sealedorsealed exceptionally, - the underlying queue cannot accept the element (e.g. bounded queue is full), or
- the stream has been
closedby the consumer.
- Parameters:
message- the message to addcontext- the context associated with the message- Returns:
trueif the element was accepted, otherwisefalse.
- the stream has been
-
seal
public void seal() -
sealExceptionally
Seals this queue exceptionally, indicating that no further elements will be added and that an error has occurred during production.Any
Messagesofferedafter this method is invoked will be rejected.Already buffered elements may still be consumed via
AbstractMessageStream.next()orAbstractMessageStream.peek(). Once the queue is empty, the stream will complete andAbstractMessageStream.error()will report the providedThrowable.- Parameters:
error- theThrowablerepresenting the error that caused the stream to fail
-
onCompleted
protected final void onCompleted()Description copied from class:AbstractMessageStreamCallback invoked when the stream is about to transition to a completed state, either successfully or exceptionally. Subclasses may override this method to perform custom actions on completion.If the implementation throws an exception, the stream still completes, but it will complete with the thrown exception. If the stream was about to complete with an error, and the callback fails as well, the exception is added as a suppressed exception.
- Overrides:
onCompletedin classAbstractMessageStream<M extends Message>
-