Class DelegatingMessageStream<DM extends Message,RM extends Message>
- Type Parameters:
DM- The type of Message handled by the delegate.RM- The type of Message handled by this MessageStream.
- All Implemented Interfaces:
MessageStream<RM>
- Since:
- 5.0.0
- Author:
- Allard Buijze
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.axonframework.messaging.core.MessageStream
MessageStream.Empty<M extends Message>, MessageStream.Entry<M extends Message>, MessageStream.Single<M extends Message> -
Constructor Summary
ConstructorsConstructorDescriptionDelegatingMessageStream(MessageStream<DM> delegate) Constructs the DelegatingMessageStream with givendelegateto receive calls. -
Method Summary
Modifier and TypeMethodDescriptionvoidclose()Closes this stream, indicating that the consumer is no longer interested in receiving further entries.protected MessageStream<DM> delegate()Returns the delegate as provided in the constructor.error()Indicates whether any error has been reported in this stream.booleanIndicates whether anentryis available for immediate reading.booleanIndicates whether this stream has been completed.voidsetCallback(Runnable callback) Registers the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.axonframework.messaging.core.MessageStream
cast, concatWith, filter, first, ignoreEntries, map, mapMessage, next, onClose, onComplete, onErrorContinue, onNext, peek, reduce
-
Constructor Details
-
DelegatingMessageStream
Constructs the DelegatingMessageStream with givendelegateto receive calls.- Parameters:
delegate- The instance to delegate calls to.
-
-
Method Details
-
setCallback
Description copied from interface:MessageStreamRegisters the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error), with the intent that a consumer of this stream will read the available messages completely.This has the following implications:
- When you register the callback on an already completed stream, the callback is invoked directly.
- When you register the callback on a stream having entries available, the callback is invoked, and you must consume the existing entries.
- A registered callback is invoked again when all entries were consumed and new entries arrive.
- Depending on the implementation of the stream, your callback might be invoked again while you
are still consuming entries, in these cases it is your responsibility to synchronize the callback executions.
Using
MessageStream.reduce(Object, BiFunction)might be a better fit because it guarantees isolated execution of callbacks using aprocessingGate.
Note that an invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.
Any previously registered callback is replaced with the given
callback.The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error, unless the callback was called to indicate completion.
- Specified by:
setCallbackin interfaceMessageStream<DM extends Message>- Parameters:
callback- The callback to invoke whenentriesare available for reading, or the stream completes.- See Also:
-
error
Description copied from interface:MessageStreamIndicates whether any error has been reported in this stream. Implementations may choose to not return any error here until allentriesthat were available for reading before any error occurred have been consumed.- Specified by:
errorin interfaceMessageStream<DM extends Message>- Returns:
- An optional containing the possible error this stream completed with.
-
isCompleted
public boolean isCompleted()Description copied from interface:MessageStreamIndicates whether this stream has been completed. A completed stream will never returnentriesfromMessageStream.next(), andMessageStream.hasNextAvailable()will always returnfalse. If the stream completed with an error,MessageStream.error()will report so.- Specified by:
isCompletedin interfaceMessageStream<DM extends Message>- Returns:
trueif the stream completed, otherwisefalse.
-
hasNextAvailable
public boolean hasNextAvailable()Description copied from interface:MessageStreamIndicates whether anentryis available for immediate reading. When entries are reported available, there is no guarantee thatMessageStream.next()will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returnsfalse.- Specified by:
hasNextAvailablein interfaceMessageStream<DM extends Message>- Returns:
truewhen there areentriesavailable for reading,falseotherwise.
-
close
public void close()Description copied from interface:MessageStreamCloses this stream, indicating that the consumer is no longer interested in receiving further entries.This is a consumer-driven cancellation operation. It immediately transitions the stream into a terminal state and discards any remaining or buffered entries, including any peeked entries. After this call returns:
MessageStream.next()will not return further entriesMessageStream.peek()will returnOptional.empty()MessageStream.hasNextAvailable()will returnfalse
This method is intended exclusively for consumer-side cancellation. It signals that no further processing is required and allows implementations to release all resources immediately.
This operation is fundamentally different from producer-side completion. When an implementation determines that no further elements will ever become available, it must not call
close(). Instead, it must:- stop producing new elements,
- allow any buffered elements to be consumed normally, and
- transition the stream to completion using its normal terminal signaling mechanism
(e.g. completion or error through
next()/fetchNext()).
Calling this method is only required when the consumer chooses not to fully consume the stream. Streams must always release resources when they complete, regardless of whether
close()is invoked.- Specified by:
closein interfaceMessageStream<DM extends Message>
-
delegate
Returns the delegate as provided in the constructor.- Returns:
- the delegate as provided in the constructor.
-