Interface MessageStream<M extends Message>

Type Parameters:
M - The type of Message contained in the entries of this stream.
All Known Subinterfaces:
MessageStream.Empty<M>, MessageStream.Single<M>
All Known Implementing Classes:
AbstractQueryResponseMessageStream, CloseCallbackMessageStream, ContinuousMessageStream, DelayedMessageStream, DelegatingMessageStream, QueryResponseMessageStream, QueryUpdateMessageStream, QueueMessageStream, SourcingEventMessageStream, StreamingEventMessageStream

public interface MessageStream<M extends Message>
Represents a stream of entries containing Messages of type M that can be consumed as they become available.

A Message Stream is asynchronous by nature. All operations are non-blocking by design, although some implementations may choose to block for certain conditions. In that case, the streams must document so explicitly.

To get notified of anything potentially available for consumption, one must register a setCallback(Runnable) callback. This callback is invoked each time information is *potentially* available for consumption. There is no guarantee that entries are available for consumption when this callback is invoked. When consuming the stream, one must also ensure to check the isCompleted() status and potentially the presence of errors using error().

When clients choose not to consume a stream until completion (when isCompleted() returns true), it must be closed by calling close(). This ensures that the producing side is notified of the closure and can clean up resources.

Since:
5.0.0
Author:
Allard Buijze, Ivan Dugalic, Milan Savić, Mitchell Herrijgers, Steven van Beelen
  • Method Details

    • fromIterable

      static <M extends Message> MessageStream<M> fromIterable(@Nonnull Iterable<M> iterable)
      Create a stream that provides the Messages returned by the given iterable, automatically wrapped in an MessageStream.Entry.

      The returned stream will provide the messages as provided by the Iterable.iterator() call on the given iterable.

      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      iterable - The Iterable providing the Messages to stream.
      Returns:
      A stream of entries that return the Messages provided by the given iterable.
    • fromItems

      @SafeVarargs static <M extends Message> MessageStream<M> fromItems(@Nonnull M... items)
      Creates a MessageStream that provides the given items and then completes.
      Type Parameters:
      M - The type of message the stream contains
      Parameters:
      items - The items to return in the stream.
      Returns:
      a MessageStream that contains the given items and then completes.
    • fromIterable

      static <M extends Message> MessageStream<M> fromIterable(@Nonnull Iterable<M> iterable, @Nonnull Function<M,Context> contextSupplier)
      Create a stream that provides the Messages returned by the given iterable, automatically wrapped in an MessageStream.Entry with the resulting Context from the contextSupplier.

      The returned stream will provide the messages as provided by the Iterable.iterator() call on the given iterable.

      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      iterable - The Iterable providing the Messages to stream.
      contextSupplier - A Function ingesting each Message from the given iterable returning the Context to set for the MessageStream.Entry the Message is wrapped in.
      Returns:
      A stream of entries that return the Messages provided by the given iterable with a Context provided by the contextSupplier.
    • fromStream

      static <M extends Message> MessageStream<M> fromStream(@Nonnull Stream<M> stream)
      Create a stream that provides the Messages returned by the given stream, automatically wrapped in an MessageStream.Entry.
      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      stream - The Stream providing the Messages to stream.
      Returns:
      A stream of entries that return the Messages provided by the given stream.
    • fromStream

      static <M extends Message> MessageStream<M> fromStream(@Nonnull Stream<M> stream, @Nonnull Function<M,Context> contextSupplier)
      Create a stream that provides the Messages returned by the given stream, automatically wrapped in an MessageStream.Entry with the resulting Context from the contextSupplier.
      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      stream - The Stream providing the Messages to stream.
      contextSupplier - A Function ingesting each Message from the given stream returning the Context to set for the MessageStream.Entry the Message is wrapped in.
      Returns:
      A stream of entries that return the Messages provided by the given stream with a Context provided by the contextSupplier.
    • fromStream

      static <T, M extends Message> MessageStream<M> fromStream(@Nonnull Stream<T> stream, @Nonnull Function<T,M> messageSupplier, @Nonnull Function<T,Context> contextSupplier)
      Create a stream that provides the items of type T returned by the given stream, automatically wrapped in an MessageStream.Entry with the resulting Message and Context from the messageSupplier and the contextSupplier respectively.
      Type Parameters:
      T - The type of item contained in the given stream that will be mapped to a Message and Context by the messageSupplier and contextSupplier respectively.
      M - The type of Message contained in the entries of this stream.
      Parameters:
      stream - The Stream providing the items of type T to map to a Message and Context.
      messageSupplier - A Function ingesting each item of type T from the given stream returning the Message to set for the MessageStream.Entry to add in the resulting MessageStream.
      contextSupplier - A Function ingesting each item of type T from the given stream returning the Context to set for the MessageStream.Entry to add in the resulting MessageStream.
      Returns:
      A stream of entries that return the Messages resulting from the given messageSupplier with a Context provided by the contextSupplier.
    • fromFuture

      static <M extends Message> MessageStream.Single<M> fromFuture(@Nonnull CompletableFuture<M> future)
      Create a stream that returns a single entry wrapping the Message from the given future, once the given future completes.

      The stream will contain at most a single entry. It may also contain no entries if the future returns null. The stream will complete with an exception when the given future completes exceptionally.

      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      future - The CompletableFuture providing the Message to contain in the stream.
      Returns:
      A stream containing at most one entry from the given future.
    • fromFuture

      static <M extends Message> MessageStream.Single<M> fromFuture(@Nonnull CompletableFuture<M> future, @Nonnull Function<M,Context> contextSupplier)
      Create a stream that returns a single entry wrapping the Message from the given future, once the given future completes.

      The automatically generated Entry will have the Context as given by the contextSupplier.

      The stream will contain at most a single entry. It may also contain no entries if the future returns null. The stream will complete with an exception when the given future completes exceptionally.

      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      future - The CompletableFuture providing the Message to contain in the stream.
      contextSupplier - A Function ingesting the Message from the given future returning the Context to set for the MessageStream.Entry the Message is wrapped in.
      Returns:
      A stream containing at most one entry from the given future with a Context provided by the contextSupplier.
    • just

      static <M extends Message> MessageStream.Single<M> just(@Nullable M message)
      Create a stream containing the single given message, automatically wrapped in an MessageStream.Entry.

      Once the Entry is consumed, the stream is considered completed.

      Type Parameters:
      M - The type of Message given.
      Parameters:
      message - The Message to wrap in an MessageStream.Entry and return in the stream.
      Returns:
      A stream consisting of a single entry wrapping the given message.
    • just

      static <M extends Message> MessageStream.Single<M> just(@Nullable M message, @Nonnull Function<M,Context> contextSupplier)
      Create a stream containing the single given message, automatically wrapped in an MessageStream.Entry.

      Once the Entry is consumed, the stream is considered completed.

      Type Parameters:
      M - The type of Message given.
      Parameters:
      message - The Message to wrap in an MessageStream.Entry and return in the stream.
      contextSupplier - A Function ingesting the given message returning the Context to set for the MessageStream.Entry the message is wrapped in.
      Returns:
      A stream consisting of a single entry wrapping the given message with a Context provided by the contextSupplier.
    • failed

      static <M extends Message> MessageStream.Empty<M> failed(@Nonnull Throwable failure)
      Create a stream that completed with given failure.

      All attempts to read from this stream will propagate this error.

      Type Parameters:
      M - The type of Message contained in the entries of this stream.
      Parameters:
      failure - The Throwable to propagate to consumers of the stream.
      Returns:
      A stream that is completed exceptionally.
    • empty

      static MessageStream.Empty<Message> empty()
      Create a stream that carries no MessageStream.Entry and is considered to be successfully completed.

      Any attempt to convert this stream to a component that requires an entry to be returned (such as CompletableFuture), will have it return null.

      Returns:
      An empty stream.
    • first

      default MessageStream.Single<M> first()
      Returns a stream that includes only the first message of this stream, unless it completes without delivering any messages, in which case it completes the same way.

      When the first message is delivered, the returned stream completes normally, independently of how this stream completes. Upon consuming the first message, this stream is close() immediately.

      Returns:
      A stream that includes only the first message of this stream.
    • ignoreEntries

      default MessageStream.Empty<M> ignoreEntries()
      Returns a stream that consumes all messages from this stream, but ignores the results and completes when this stream completes.

      Unlike simply closing the stream, the returned stream will still cause upstream entries to be consumed and any registered callbacks to be invoked.

      Returns:
      An Empty stream that ignores all results.
    • next

      Returns an Optional carrying the next entry from the stream, if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Returns:
      An optional carrying the next entry, if available.
    • peek

      Returns an Optional carrying the next entry from the stream (without moving the stream pointer), if such entry was available. If no entry was available for reading, this method returns an empty Optional.

      This method will never block for elements becoming available.

      Returns:
      An optional carrying the next entry, if available.
    • setCallback

      void setCallback(@Nonnull Runnable callback)
      Registers the callback to invoke when entries are available for reading or when the stream completes (either normally or with an error). An invocation of the callback does not in any way guarantee that entries are indeed available, or that the stream has indeed been completed. Implementations may choose to suppress repeated invocations of the callback if no entries have been read in the meantime.

      Any previously registered callback is replaced with the given callback.

      The callback is called on an arbitrary thread, and it should keep work performed on this thread to a minimum as this may interfere with other callbacks handled by the same thread. Any exception thrown by the callback will result in the stream completing with this exception as the error.

      Parameters:
      callback - The callback to invoke when entries are available for reading, or the stream completes.
    • error

      Indicates whether any error has been reported in this stream. Implementations may choose to not return any error here until all entries that were available for reading before any error occurred have been consumed.
      Returns:
      An optional containing the possible error this stream completed with.
    • isCompleted

      boolean isCompleted()
      Indicates whether this stream has been completed. A completed stream will never return entries from next(), and hasNextAvailable() will always return false. If the stream completed with an error, error() will report so.
      Returns:
      true if the stream completed, otherwise false.
    • hasNextAvailable

      boolean hasNextAvailable()
      Indicates whether an entry is available for immediate reading. When entries are reported available, there is no guarantee that next() will indeed return an entry. However, besides any concurrent activity on this stream, it is guaranteed that no entries are available for reading when this method returns false.
      Returns:
      true when there are entries available for reading, false otherwise.
    • close

      void close()
      Closes this stream, freeing any possible resources occupied by the underlying stream. After invocation, some entries may still be available for reading.

      Implementations must always release resources when a stream is completed, either with an error or normally. Therefore, it is only required to close() a stream if the consumer decides to not read until the end.

    • map

      default <RM extends Message> MessageStream<RM> map(@Nonnull Function<MessageStream.Entry<M>,MessageStream.Entry<RM>> mapper)
      Returns a stream that maps each entry from this stream using given mapper function into an entry carrying a MessageEntry with a Message of type RM.

      The returned stream completes the same way this stream completes.

      Type Parameters:
      RM - The declared type of Message contained in the returned entry.
      Parameters:
      mapper - The function converting entries from this stream from entries containing message of type M to RM.
      Returns:
      A stream with all entries mapped according to the mapper function.
    • mapMessage

      default <RM extends Message> MessageStream<RM> mapMessage(@Nonnull Function<M,RM> mapper)
      Returns a stream that maps each message from the entries in this stream using the given mapper function. This maps the Messages from type M to type RM.

      The returned stream completes the same way this stream completes.

      Type Parameters:
      RM - The declared type of Message contained in the returned entry.
      Parameters:
      mapper - The function converting message from the entries in this stream from type M to RM.
      Returns:
      A stream with all entries mapped according to the mapper function.
    • reduce

      default <R> CompletableFuture<R> reduce(@Nonnull R identity, @Nonnull BiFunction<R,MessageStream.Entry<M>,R> accumulator)
      Returns a CompletableFuture of type R, using the given identity as the initial value for the given accumulator. Throws an exception if this stream is unbounded.

      The accumulator will process all entries within this stream until a single value of type R is left.

      Note that parallel processing is not supported!

      Type Parameters:
      R - The result of the accumulator after reducing all entries from this stream.
      Parameters:
      identity - The initial value given to the accumulator.
      accumulator - The BiFunction accumulating all entries from this stream into a return value of type R.
      Returns:
      A CompletableFuture carrying the result of the given accumulator that reduced the entire stream.
      Throws:
      UnsupportedOperationException - if this stream is unbounded
    • onNext

      default MessageStream<M> onNext(@Nonnull Consumer<MessageStream.Entry<M>> onNext)
      Invokes the given onNext each time an entry is consumed from this stream.

      Depending on the stream's implementation, the function may be invoked when the entry is provided to the Consumer, or at the moment it's available for reading on the stream. Subscribing multiple times to the resulting stream may cause the given onNext to be invoked more than once for an entry.

      Parameters:
      onNext - The Consumer to invoke for each entry.
      Returns:
      A stream that will invoke the given onNext for each entry.
    • onErrorContinue

      default MessageStream<M> onErrorContinue(@Nonnull Function<Throwable,MessageStream<M>> onError)
      Returns a stream that, when this stream completes with an error, continues reading from the stream provided by given onError function.
      Parameters:
      onError - The Function providing the replacement stream to continue with after an exception on this stream.
      Returns:
      A stream that continues onto another stream when this stream completes with an error.
    • filter

      default MessageStream<M> filter(@Nonnull Predicate<MessageStream.Entry<M>> filter)
      Returns a stream that will filter entries based on the given filter.
      Parameters:
      filter - The MessageStream.Entry predicate, that will filter out entries. Returning true from this lambda will keep the entry, while returning false will remove it.
      Returns:
      A stream for which the entries have been filtered by the given filter.
    • concatWith

      default MessageStream<M> concatWith(@Nonnull MessageStream<M> other)
      Returns a stream that concatenates this stream with the given other stream, if this stream completes successfully. Throws an exception if this stream is unbounded.

      When this stream completes with an error, so does the returned stream.

      Parameters:
      other - The MessageStream to append to this stream.
      Returns:
      A stream concatenating this stream with given other.
      Throws:
      UnsupportedOperationException - if this stream is unbounded
    • onComplete

      default MessageStream<M> onComplete(@Nonnull Runnable completeHandler)
      Returns a stream that invokes the given completeHandler when the stream completes normally. Throws an exception if this stream is unbounded.
      Parameters:
      completeHandler - The Runnable to invoke when the stream completes normally.
      Returns:
      A stream that invokes the completeHandler upon normal completion.
      Throws:
      UnsupportedOperationException - if this stream is unbounded
    • cast

      default <T extends Message> MessageStream<T> cast()
      Casts this stream to the given type. This method is provided to be more flexible with generics. It is the caller's responsibility to ensure the cast is valid. Failure to do so may result in ClassCastException when reading elements.
      Type Parameters:
      T - The type of Message to cast the MessageStream to.
      Returns:
      This instance, cast to the given Message of type T.
    • onClose

      default MessageStream<M> onClose(Runnable closeHandler)
      Returns a stream that, when it is either explicitly closed using close(), or when this stream completes (regularly or with an error) calls the given closeHandler.
      Parameters:
      closeHandler - The handler to invoke when this stream is closed or terminates.
      Returns:
      a stream that invokes the given closeHandler upon closing or termination.