Class ContinuousMessageStream<E>
- Type Parameters:
E- the type of the raw elements returned by the fetcher before conversion toEventMessages
- All Implemented Interfaces:
MessageStream<EventMessage>
MessageStream implementation that continuously fetches event messages from a configurable data source. This
stream has no defined end and will continue retrieving new batches of data as they become available.
The stream relies on externally provided functional strategies to control its behavior:
- A
fetcherto obtain the next batch of elements. The fetcher owns all cursor/position state internally and is simply called each time more data is needed. - A
converterto transform fetched elements intoMessageStream.Entryinstances. - A
callbackTrackerto manage callback registration for new data availability.
AbstractMessageStream.close() and
callback registration via AbstractMessageStream.setCallback(Runnable).- Since:
- 5.0.0
- Author:
- John Hendrikx
-
Nested Class Summary
Nested classes/interfaces inherited from class org.axonframework.messaging.core.AbstractMessageStream
AbstractMessageStream.FetchResult<T extends @Nullable MessageStream.Entry<?>>Nested classes/interfaces inherited from interface org.axonframework.messaging.core.MessageStream
MessageStream.Empty<M extends Message>, MessageStream.Entry<M extends Message>, MessageStream.Single<M extends Message> -
Constructor Summary
ConstructorsConstructorDescriptionContinuousMessageStream(Supplier<List<E>> fetcher, Function<E, MessageStream.Entry<EventMessage>> converter, BiFunction<ContinuousMessageStream<?>, Runnable, Registration> callbackTracker) Creates a newContinuousMessageStreaminstance configured with the given strategies. -
Method Summary
Modifier and TypeMethodDescriptionAttempts to fetch the next availableMessageStream.Entryfrom the underlying source.protected voidCallback invoked when the stream is about to transition to a completed state, either successfully or exceptionally.Methods inherited from class org.axonframework.messaging.core.AbstractMessageStream
close, describeDelegates, describeFlags, error, hasNextAvailable, initialize, isCompleted, next, peek, setCallback, signalProgress, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.axonframework.messaging.core.MessageStream
cast, concatWith, filter, first, ignoreEntries, map, mapMessage, onClose, onComplete, onErrorContinue, onNext, reduce
-
Constructor Details
-
ContinuousMessageStream
public ContinuousMessageStream(Supplier<List<E>> fetcher, Function<E, MessageStream.Entry<EventMessage>> converter, BiFunction<ContinuousMessageStream<?>, Runnable, Registration> callbackTracker) Creates a newContinuousMessageStreaminstance configured with the given strategies.- Parameters:
fetcher- a supplier that returns the next batch of elements to emit. The fetcher is responsible for tracking its own position; it is called repeatedly whenever the stream needs more data. Must not returnnull, but may return an empty list to indicate no new data is currently available.converter- a function converting each fetched element into anMessageStream.Entrycontaining anEventMessagecallbackTracker- a function that, given this stream and a callbackRunnable, registers a listener and returns aRegistrationallowing it to be canceled
-
-
Method Details
-
onCompleted
protected void onCompleted()Description copied from class:AbstractMessageStreamCallback invoked when the stream is about to transition to a completed state, either successfully or exceptionally. Subclasses may override this method to perform custom actions on completion.If the implementation throws an exception, the stream still completes, but it will complete with the thrown exception. If the stream was about to complete with an error, and the callback fails as well, the exception is added as a suppressed exception.
- Overrides:
onCompletedin classAbstractMessageStream<EventMessage>
-
fetchNext
Description copied from class:AbstractMessageStreamAttempts to fetch the next availableMessageStream.Entryfrom the underlying source.This method is invoked by
AbstractMessageStream.next()when no previously peeked entry is available. Implementations must return aAbstractMessageStream.FetchResultdescribing the current state of the stream:AbstractMessageStream.FetchResult.Valueif an entry is immediately available,AbstractMessageStream.FetchResult.NotReadyif no entry is currently available but more may arrive later,AbstractMessageStream.FetchResult.Completedif the stream is exhausted and will produce no further entries,AbstractMessageStream.FetchResult.Errorif the stream has failed with an error.
Returning
AbstractMessageStream.FetchResult.NotReadywill transition the stream into an awaiting data state. Implementations must subsequently invokeAbstractMessageStream.signalProgress()when progress may be possible again (e.g., when new data arrives or the stream completes).Implementations must ensure that any state changes observable via this method are fully applied before invoking
AbstractMessageStream.signalProgress(). A signal that arrives before the consumer has entered the awaiting state is not replayed; correctness relies on this method returning the updated state when the consumer calls it next. SeeAbstractMessageStream.signalProgress()for the full ordering contract.This method must be non-blocking. It should return immediately with the best available information about the stream's current state.
Implementations must not attempt to complete or close the stream directly. Instead, they must return
AbstractMessageStream.FetchResult.CompletedorAbstractMessageStream.FetchResult.Errorto signal termination.If an implementation throws an exception, the stream completes exceptionally with that exception.
- Specified by:
fetchNextin classAbstractMessageStream<EventMessage>- Returns:
- a
AbstractMessageStream.FetchResultrepresenting the outcome of the fetch attempt
-