Class MultiStreamableEventSource
- All Implemented Interfaces:
StreamableEventSource,TrackingTokenSource
StreamableEventSource implementation that allows streaming event processors to process events from multiple
underlying event sources. This is useful when events need to be consumed from:
- Multiple event stores
- Multiple Axon Server contexts
- Different storage types (for example, an Event Store and a Kafka topic)
Events from the different sources are merged using a MergedMessageStream, with the order of event consumption
determined by a configurable Comparator. The default comparator returns the oldest event available (based on
the event's timestamp). Each source is tracked independently using a MultiSourceTrackingToken, which maintains
the position for each individual source.
- Since:
- 5.1.0
- Author:
- Allard Buijze, Greg Woods
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic interfaceIntermediate builder for collecting event sources before creating a MultiStreamableEventSource. -
Field Summary
FieldsModifier and TypeFieldDescriptionfinal Context.ResourceKey<String> Resource key used to identify which source an event originated from. -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedMultiStreamableEventSource(Map<String, StreamableEventSource> eventSources, Comparator<MessageStream.Entry<EventMessage>> eventComparator) Constructs a MultiStreamableEventSource from the collected sources and comparator. -
Method Summary
Modifier and TypeMethodDescriptioncombining(String sourceName, StreamableEventSource source) Creates a new MultiStreamableEventSource by combining multiple event sources.firstToken(@Nullable ProcessingContext context) Creates aTrackingTokenrepresenting the first position of theevent stream.latestToken(@Nullable ProcessingContext context) Creates aTrackingTokenrepresenting the latest position, thus pointing at the next event of theevent stream.open(StreamingCondition condition, @Nullable ProcessingContext context) tokenAt(Instant at, @Nullable ProcessingContext context)
-
Field Details
-
SOURCE_ID_RESOURCE
Resource key used to identify which source an event originated from. This key can be accessed in theeventComparatorto determine the source of an event and apply custom ordering logic, such as giving precedence to events from a specific source.
-
-
Constructor Details
-
MultiStreamableEventSource
protected MultiStreamableEventSource(Map<String, StreamableEventSource> eventSources, Comparator<MessageStream.Entry<EventMessage>> eventComparator) Constructs a MultiStreamableEventSource from the collected sources and comparator.- Parameters:
eventSources- The map of event sources, keyed by their unique names.eventComparator- The comparator to use when deciding which message to process first.
-
-
Method Details
-
combining
public static MultiStreamableEventSource.SourceCollector combining(String sourceName, StreamableEventSource source) Creates a new MultiStreamableEventSource by combining multiple event sources. This is the starting point for the fluent builder API.Example usage:
// With default timestamp-based comparison MultiStreamableEventSource source = MultiStreamableEventSource .combining("eventStore1", eventSource1) .and("eventStore2", eventSource2) .comparingTimestamps(); // With custom comparator MultiStreamableEventSource source = MultiStreamableEventSource .combining("eventStore1", eventSource1) .and("eventStore2", eventSource2) .comparingUsing(customComparator);- Parameters:
sourceName- A unique name identifying the first source.source- The first event source to include.- Returns:
- A SourceCollector for adding more sources and configuring the comparator.
-
open
public MessageStream<EventMessage> open(StreamingCondition condition, @Nullable ProcessingContext context) Description copied from interface:StreamableEventSourceOpen anevent streamcontaining alleventsmatching the givencondition.To retrieve the
positionof the returned events, theTrackingToken.fromContext(Context)operation should be used by providing the entireMessageStream.Entrywrapping the returned events.Note that the returned stream is infinite, so beware of applying terminal operations to the returned stream.
When all events are of interest during streaming, then use
EventCriteria.havingAnyTag()as the condition criteria.- Specified by:
openin interfaceStreamableEventSource- Parameters:
condition- TheStreamingConditiondefining thestarting positionof the stream andevent criteriato filter the stream with.context- The currentProcessingContext, if any.- Returns:
- An
event streammatching the givencondition.
-
firstToken
Description copied from interface:TrackingTokenSourceCreates aTrackingTokenrepresenting the first position of theevent stream.As the retrieved token represents the point from which to
the event stream, the first event to be streamed when opening is the one right after the returned token.invalid reference
openSubsequent invocation of this method will yield the same result, unless the stream's initial values are deleted.
- Specified by:
firstTokenin interfaceTrackingTokenSource- Parameters:
context- The currentProcessingContext, if any.- Returns:
- A
CompletableFutureof aTrackingTokenrepresenting the first event of theevent stream.
-
latestToken
Description copied from interface:TrackingTokenSourceCreates aTrackingTokenrepresenting the latest position, thus pointing at the next event of theevent stream.As the retrieved token represents the point from which to
the event stream, the first event to be streamed when opening is the one right after the returned token.invalid reference
openSince the
event streamof this source is theoretically infinite, subsequent invocation of this operation typically return a different token. Only if thisStreamableEventSourceis idle, will severallatestToken()invocations result in the sameTrackingToken.- Specified by:
latestTokenin interfaceTrackingTokenSource- Parameters:
context- The currentProcessingContext, if any.- Returns:
- A
CompletableFutureof aTrackingTokenrepresenting the latest event, thus pointing at the next event of theevent stream.
-
tokenAt
Description copied from interface:TrackingTokenSourceCreates aTrackingTokentracking alleventsafter the givenatfrom anevent stream.When there is an
EventMessageexactly at the givendateTime, it will be tracked too.- Specified by:
tokenAtin interfaceTrackingTokenSource- Parameters:
at- TheInstantdetermining how theTrackingTokenshould be created. The returned token points at very first event before thisInstant.context- The currentProcessingContext, if any.- Returns:
- A
CompletableFutureofTrackingTokenpointing at the very first event before the givenatof theevent stream.
-