Interface MessageStream<M extends Message>
- All Known Subinterfaces:
MessageStream.Empty<M>,MessageStream.Single<M>
- All Known Implementing Classes:
AbstractQueryResponseMessageStream,CloseCallbackMessageStream,ContinuousMessageStream,DelayedMessageStream,DelegatingMessageStream,QueryResponseMessageStream,QueryUpdateMessageStream,QueueMessageStream,SourcingEventMessageStream,StreamingEventMessageStream
entries containing Messages of type M that can be
consumed as they become available.
A Message Stream is asynchronous by nature. All operations are non-blocking by design, although some implementations may choose to block for certain conditions. In that case, the streams must document so explicitly.
To get notified of anything potentially available for consumption, one must register a setCallback(Runnable)
callback. This callback is invoked each time information is *potentially* available for consumption. There is no
guarantee that entries are available for consumption when this callback is invoked. When consuming the stream, one
must also ensure to check the isCompleted() status and potentially the presence of errors using
error().
When clients choose not to consume a stream until completion (when isCompleted() returns true), it
must be closed by calling close(). This ensures that the producing side is notified of the closure and can
clean up resources.
- Since:
- 5.0.0
- Author:
- Allard Buijze, Ivan Dugalic, Milan Savić, Mitchell Herrijgers, Steven van Beelen
-
Nested Class Summary
Nested ClassesModifier and TypeInterfaceDescriptionstatic interfaceMessageStream.Empty<M extends Message>AMessageStreamimplementation that completes normally or with an error without returning any elements.static interfaceMessageStream.Entry<M extends Message>AMessageStream-specific container ofMessageimplementations.static interfaceMessageStream.Single<M extends Message>AMessageStreamimplementation that returns at most a single result before completing. -
Method Summary
Modifier and TypeMethodDescriptiondefault <T extends Message>
MessageStream<T> cast()Casts this stream to the given type.voidclose()Closes this stream, freeing any possible resources occupied by the underlying stream.default MessageStream<M> concatWith(MessageStream<M> other) Returns a stream that concatenates this stream with the givenotherstream, if this stream completes successfully.static MessageStream.Empty<Message> empty()Create a stream that carries noMessageStream.Entryand is considered to be successfully completed.error()Indicates whether any error has been reported in this stream.static <M extends Message>
MessageStream.Empty<M> Create a stream that completed with givenfailure.default MessageStream<M> filter(Predicate<MessageStream.Entry<M>> filter) Returns a stream that will filterentriesbased on the givenfilter.default MessageStream.Single<M> first()Returns astreamthat includes only the first message ofthisstream, unless it completes without delivering any messages, in which case it completes the same way.static <M extends Message>
MessageStream.Single<M> fromFuture(CompletableFuture<M> future) static <M extends Message>
MessageStream.Single<M> fromFuture(CompletableFuture<M> future, Function<M, Context> contextSupplier) static <M extends Message>
MessageStream<M> fromItems(M... items) Creates a MessageStream that provides the givenitemsand then completes.static <M extends Message>
MessageStream<M> fromIterable(Iterable<M> iterable) Create a stream that provides theMessagesreturned by the giveniterable, automatically wrapped in anMessageStream.Entry.static <M extends Message>
MessageStream<M> fromIterable(Iterable<M> iterable, Function<M, Context> contextSupplier) Create a stream that provides theMessagesreturned by the giveniterable, automatically wrapped in anMessageStream.Entrywith the resultingContextfrom thecontextSupplier.static <M extends Message>
MessageStream<M> fromStream(Stream<M> stream) Create a stream that provides theMessagesreturned by the givenstream, automatically wrapped in anMessageStream.Entry.static <M extends Message>
MessageStream<M> fromStream(Stream<M> stream, Function<M, Context> contextSupplier) Create a stream that provides theMessagesreturned by the givenstream, automatically wrapped in anMessageStream.Entrywith the resultingContextfrom thecontextSupplier.static <T,M extends Message>
MessageStream<M> fromStream(Stream<T> stream, Function<T, M> messageSupplier, Function<T, Context> contextSupplier) Create a stream that provides the items of typeTreturned by the givenstream, automatically wrapped in anMessageStream.Entrywith the resultingMessageandContextfrom themessageSupplierand thecontextSupplierrespectively.booleanIndicates whether anentryis available for immediate reading.default MessageStream.Empty<M> Returns a stream that consumes all messages from this stream, but ignores the results and completes when this stream completes.booleanIndicates whether this stream has been completed.static <M extends Message>
MessageStream.Single<M> just(M message) Create a stream containing the single givenmessage, automatically wrapped in anMessageStream.Entry.static <M extends Message>
MessageStream.Single<M> Create a stream containing the single givenmessage, automatically wrapped in anMessageStream.Entry.default <RM extends Message>
MessageStream<RM> map(Function<MessageStream.Entry<M>, MessageStream.Entry<RM>> mapper) default <RM extends Message>
MessageStream<RM> mapMessage(Function<M, RM> mapper) next()Returns an Optional carrying the nextentryfrom the stream, if such entry was available.default MessageStream<M> Returns a stream that, when it is either explicitly closed usingclose(), or when this stream completes (regularly or with an error) calls the givencloseHandler.default MessageStream<M> onComplete(Runnable completeHandler) Returns a stream that invokes the givencompleteHandlerwhen the stream completes normally.default MessageStream<M> onErrorContinue(Function<Throwable, MessageStream<M>> onError) Returns a stream that, whenthisstream completes with an error, continues reading from the stream provided by givenonErrorfunction.default MessageStream<M> onNext(Consumer<MessageStream.Entry<M>> onNext) Invokes the givenonNexteach time anentryis consumed from this stream.peek()Returns an Optional carrying the nextentryfrom the stream (without moving the stream pointer), if such entry was available.default <R> CompletableFuture<R> reduce(R identity, BiFunction<R, MessageStream.Entry<M>, R> accumulator) Returns aCompletableFutureof typeR, using the givenidentityas the initial value for the givenaccumulator.voidsetCallback(Runnable callback) Registers the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error).
-
Method Details
-
fromIterable
Create a stream that provides theMessagesreturned by the giveniterable, automatically wrapped in anMessageStream.Entry.The returned stream will provide the messages as provided by the
Iterable.iterator()call on the giveniterable. -
fromItems
Creates a MessageStream that provides the givenitemsand then completes.- Type Parameters:
M- The type of message the stream contains- Parameters:
items- The items to return in the stream.- Returns:
- a MessageStream that contains the given
itemsand then completes.
-
fromIterable
static <M extends Message> MessageStream<M> fromIterable(@Nonnull Iterable<M> iterable, @Nonnull Function<M, Context> contextSupplier) Create a stream that provides theMessagesreturned by the giveniterable, automatically wrapped in anMessageStream.Entrywith the resultingContextfrom thecontextSupplier.The returned stream will provide the messages as provided by the
Iterable.iterator()call on the giveniterable.- Type Parameters:
M- The type ofMessagecontained in theentriesof this stream.- Parameters:
iterable- TheIterableproviding theMessagesto stream.contextSupplier- AFunctioningesting eachMessagefrom the giveniterablereturning theContextto set for theMessageStream.EntrytheMessageis wrapped in.- Returns:
- A stream of
entriesthat return theMessagesprovided by the giveniterablewith aContextprovided by thecontextSupplier.
-
fromStream
Create a stream that provides theMessagesreturned by the givenstream, automatically wrapped in anMessageStream.Entry. -
fromStream
static <M extends Message> MessageStream<M> fromStream(@Nonnull Stream<M> stream, @Nonnull Function<M, Context> contextSupplier) Create a stream that provides theMessagesreturned by the givenstream, automatically wrapped in anMessageStream.Entrywith the resultingContextfrom thecontextSupplier.- Type Parameters:
M- The type ofMessagecontained in theentriesof this stream.- Parameters:
stream- TheStreamproviding theMessagesto stream.contextSupplier- AFunctioningesting eachMessagefrom the givenstreamreturning theContextto set for theMessageStream.EntrytheMessageis wrapped in.- Returns:
- A stream of
entriesthat return theMessagesprovided by the givenstreamwith aContextprovided by thecontextSupplier.
-
fromStream
static <T,M extends Message> MessageStream<M> fromStream(@Nonnull Stream<T> stream, @Nonnull Function<T, M> messageSupplier, @Nonnull Function<T, Context> contextSupplier) Create a stream that provides the items of typeTreturned by the givenstream, automatically wrapped in anMessageStream.Entrywith the resultingMessageandContextfrom themessageSupplierand thecontextSupplierrespectively.- Type Parameters:
T- The type of item contained in the givenstreamthat will be mapped to aMessageandContextby themessageSupplierandcontextSupplierrespectively.M- The type ofMessagecontained in theentriesof this stream.- Parameters:
stream- TheStreamproviding the items of typeTto map to aMessageandContext.messageSupplier- AFunctioningesting each item of typeTfrom the givenstreamreturning theMessageto set for theMessageStream.Entryto add in the resulting MessageStream.contextSupplier- AFunctioningesting each item of typeTfrom the givenstreamreturning theContextto set for theMessageStream.Entryto add in the resulting MessageStream.- Returns:
- A stream of
entriesthat return theMessagesresulting from the givenmessageSupplierwith aContextprovided by thecontextSupplier.
-
fromFuture
Create a stream that returns a singleentrywrapping theMessagefrom the givenfuture, once the givenfuturecompletes.The stream will contain at most a single entry. It may also contain no entries if the future returns
null. The stream will complete with an exception when the givenfuturecompletes exceptionally.- Type Parameters:
M- The type ofMessagecontained in theentriesof this stream.- Parameters:
future- TheCompletableFutureproviding theMessageto contain in the stream.- Returns:
- A stream containing at most one
entryfrom the givenfuture.
-
fromFuture
static <M extends Message> MessageStream.Single<M> fromFuture(@Nonnull CompletableFuture<M> future, @Nonnull Function<M, Context> contextSupplier) Create a stream that returns a singleentrywrapping theMessagefrom the givenfuture, once the givenfuturecompletes.The automatically generated
Entrywill have theContextas given by thecontextSupplier.The stream will contain at most a single entry. It may also contain no entries if the future returns
null. The stream will complete with an exception when the givenfuturecompletes exceptionally.- Type Parameters:
M- The type ofMessagecontained in theentriesof this stream.- Parameters:
future- TheCompletableFutureproviding theMessageto contain in the stream.contextSupplier- AFunctioningesting theMessagefrom the givenfuturereturning theContextto set for theMessageStream.EntrytheMessageis wrapped in.- Returns:
- A stream containing at most one
entryfrom the givenfuturewith aContextprovided by thecontextSupplier.
-
just
Create a stream containing the single givenmessage, automatically wrapped in anMessageStream.Entry.Once the
Entryis consumed, the stream is considered completed.- Type Parameters:
M- The type ofMessagegiven.- Parameters:
message- TheMessageto wrap in anMessageStream.Entryand return in the stream.- Returns:
- A stream consisting of a single
entrywrapping the givenmessage.
-
just
static <M extends Message> MessageStream.Single<M> just(@Nullable M message, @Nonnull Function<M, Context> contextSupplier) Create a stream containing the single givenmessage, automatically wrapped in anMessageStream.Entry.Once the
Entryis consumed, the stream is considered completed.- Type Parameters:
M- The type ofMessagegiven.- Parameters:
message- TheMessageto wrap in anMessageStream.Entryand return in the stream.contextSupplier- AFunctioningesting the givenmessagereturning theContextto set for theMessageStream.Entrythemessageis wrapped in.- Returns:
- A stream consisting of a single
entrywrapping the givenmessagewith aContextprovided by thecontextSupplier.
-
failed
Create a stream that completed with givenfailure.All attempts to read from this stream will propagate this error.
-
empty
Create a stream that carries noMessageStream.Entryand is considered to be successfully completed.Any attempt to convert this stream to a component that requires an entry to be returned (such as
CompletableFuture), will have it returnnull.- Returns:
- An empty stream.
-
first
Returns astreamthat includes only the first message ofthisstream, unless it completes without delivering any messages, in which case it completes the same way.When the first message is delivered, the returned stream completes normally, independently of how this stream completes. Upon consuming the first message, this stream is
close()immediately.- Returns:
- A
streamthat includes only the first message ofthisstream.
-
ignoreEntries
Returns a stream that consumes all messages from this stream, but ignores the results and completes when this stream completes.Unlike simply closing the stream, the returned stream will still cause upstream entries to be consumed and any registered callbacks to be invoked.
- Returns:
- An Empty stream that ignores all results.
-
next
Optional<MessageStream.Entry<M>> next()Returns an Optional carrying the nextentryfrom the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.This method will never block for elements becoming available.
- Returns:
- An optional carrying the next
entry, if available.
-
peek
Optional<MessageStream.Entry<M>> peek()Returns an Optional carrying the nextentryfrom the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.This method will never block for elements becoming available.
- Returns:
- An optional carrying the next
entry, if available.
-
setCallback
Registers the callback to invoke whenentriesare available for reading or when the stream completes (either normally or with an error). An invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.Any previously registered callback is replaced with the given
callback.The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error.
- Parameters:
callback- The callback to invoke whenentriesare available for reading, or the stream completes.
-
error
Indicates whether any error has been reported in this stream. Implementations may choose to not return any error here until allentriesthat were available for reading before any error occurred have been consumed.- Returns:
- An optional containing the possible error this stream completed with.
-
isCompleted
boolean isCompleted()Indicates whether this stream has been completed. A completed stream will never returnentriesfromnext(), andhasNextAvailable()will always returnfalse. If the stream completed with an error,error()will report so.- Returns:
trueif the stream completed, otherwisefalse.
-
hasNextAvailable
boolean hasNextAvailable()Indicates whether anentryis available for immediate reading. When entries are reported available, there is no guarantee thatnext()will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returnsfalse.- Returns:
truewhen there areentriesavailable for reading,falseotherwise.
-
close
void close()Closes this stream, freeing any possible resources occupied by the underlying stream. After invocation, someentriesmay still be available for reading.Implementations must always release resources when a stream is completed, either with an error or normally. Therefore, it is only required to
close()a stream if the consumer decides to not read until the end. -
map
default <RM extends Message> MessageStream<RM> map(@Nonnull Function<MessageStream.Entry<M>, MessageStream.Entry<RM>> mapper) -
mapMessage
-
reduce
default <R> CompletableFuture<R> reduce(@Nonnull R identity, @Nonnull BiFunction<R, MessageStream.Entry<M>, R> accumulator) Returns aCompletableFutureof typeR, using the givenidentityas the initial value for the givenaccumulator. Throws an exception if this stream is unbounded.The
accumulatorwill process allentrieswithin this stream until a single value of typeRis left.Note that parallel processing is not supported!
- Type Parameters:
R- The result of theaccumulatorafter reducing allentriesfrom this stream.- Parameters:
identity- The initial value given to theaccumulator.accumulator- TheBiFunctionaccumulating allentriesfrom this stream into a return value of typeR.- Returns:
- A
CompletableFuturecarrying the result of the givenaccumulatorthat reduced the entire stream. - Throws:
UnsupportedOperationException- if this stream is unbounded
-
onNext
Invokes the givenonNexteach time anentryis consumed from this stream.Depending on the stream's implementation, the function may be invoked when the entry is provided to the
Consumer, or at the moment it's available for reading on the stream. Subscribing multiple times to the resulting stream may cause the givenonNextto be invoked more than once for an entry. -
onErrorContinue
Returns a stream that, whenthisstream completes with an error, continues reading from the stream provided by givenonErrorfunction.- Parameters:
onError- TheFunctionproviding the replacement stream to continue with after an exception onthisstream.- Returns:
- A stream that continues onto another stream when
thisstream completes with an error.
-
filter
Returns a stream that will filterentriesbased on the givenfilter.- Parameters:
filter- TheMessageStream.Entrypredicate, that will filter out entries. Returningtruefrom this lambda will keep the entry, while returningfalsewill remove it.- Returns:
- A stream for which the
entrieshave been filtered by the givenfilter.
-
concatWith
Returns a stream that concatenates this stream with the givenotherstream, if this stream completes successfully. Throws an exception if this stream is unbounded.When
thisstream completes with an error, so does the returned stream.- Parameters:
other- The MessageStream to append to this stream.- Returns:
- A stream concatenating this stream with given
other. - Throws:
UnsupportedOperationException- if this stream is unbounded
-
onComplete
Returns a stream that invokes the givencompleteHandlerwhen the stream completes normally. Throws an exception if this stream is unbounded.- Parameters:
completeHandler- TheRunnableto invoke when the stream completes normally.- Returns:
- A stream that invokes the
completeHandlerupon normal completion. - Throws:
UnsupportedOperationException- if this stream is unbounded
-
cast
Casts this stream to the given type. This method is provided to be more flexible with generics. It is the caller's responsibility to ensure the cast is valid. Failure to do so may result inClassCastExceptionwhen reading elements. -
onClose
Returns a stream that, when it is either explicitly closed usingclose(), or when this stream completes (regularly or with an error) calls the givencloseHandler.- Parameters:
closeHandler- The handler to invoke when this stream is closed or terminates.- Returns:
- a stream that invokes the given
closeHandlerupon closing or termination.
-