Class DistributedQueryBus

java.lang.Object
org.axonframework.messaging.queryhandling.distributed.DistributedQueryBus
All Implemented Interfaces:
DescribableComponent, QueryBus, QueryHandlerRegistry<QueryBus>

public class DistributedQueryBus extends Object implements QueryBus
Implementation of a QueryBus that is aware of multiple instances of a QueryBus working together to spread the load.

Each "physical" QueryBus instance is considered a "segment" of a conceptual distributed QueryBus.

The DistributedQueryBus relies on a QueryBusConnector to dispatch queries and query responses to different segments of the QueryBus. Depending on the implementation used, each segment may run in a different JVM.

Since:
5.0.0
Author:
Steven van Beelen, Jan Galinski
  • Constructor Details

    • DistributedQueryBus

      public DistributedQueryBus(@Nonnull QueryBus localSegment, @Nonnull QueryBusConnector connector, @Nonnull DistributedQueryBusConfiguration configuration)
      Constructs a DistributedQueryBus using the given localSegment for subscribing handlers and the given connector to dispatch and receive queries and query responses with, to and from different segments of the QueryBus.
      Parameters:
      localSegment - The local QueryBus used to subscribe handlers to.
      connector - The QueryBusConnector to dispatch and receive queries and query responses with.
      configuration - The DistributedCommandBusConfiguration containing the ExecutorServices for querying and handling query responses.
  • Method Details

    • subscribe

      public QueryBus subscribe(@Nonnull QualifiedName queryName, @Nonnull QueryHandler queryHandler)
      Description copied from interface: QueryHandlerRegistry
      Subscribe the given queryHandler for queries and response of the given queryName.

      If a subscription already exists for the queryName, the behavior is undefined. Implementations may throw an exception to refuse duplicate subscription or alternatively decide whether the existing or new handler gets the subscription.

      Specified by:
      subscribe in interface QueryHandlerRegistry<QueryBus>
      Parameters:
      queryName - The fully qualified name of the query
      queryHandler - The handler instance that handles queries for the given queryName.
      Returns:
      This registry for fluent interfacing.
    • query

      @Nonnull public MessageStream<QueryResponseMessage> query(@Nonnull QueryMessage query, @Nullable ProcessingContext context)
      Description copied from interface: QueryBus
      Dispatch the given query to a QueryHandler subscribed to the given query's query name, returning a MessageStream of responses to the given query.

      The resulting MessageStream will contain 0, 1, or N QueryResponseMessages, depending on the QueryHandler that handled the given query.

      As several QueryHandlers can be registered for the same query name, this method will loop through them (in insert order) until one has a suitable return value. A suitable response is any value or user exception returned from a QueryHandler. When no handlers are available that can answer the given query, the returned MessageStream will have failed with a NoHandlerForQueryException.

      Specified by:
      query in interface QueryBus
      Parameters:
      query - The query to dispatch.
      context - The processing context under which the query is being published (can be null).
      Returns:
      A MessageStream containing either 0, 1, or N QueryResponseMessages.
    • subscriptionQuery

      @Nonnull public MessageStream<QueryResponseMessage> subscriptionQuery(@Nonnull QueryMessage query, @Nullable ProcessingContext context, int updateBufferSize)
      Description copied from interface: QueryBus
      Dispatch the given query to a single QueryHandler subscribed to the given query's queryName/initialResponseType/updateResponseType. The result is lazily created and there will be no execution of the query handler before there is a subscription to the initial result. In order not to miss update, the query bus will queue all update which happen after the subscription query is done and once the subscription to the flux is made, these update will be emitted.

      If there is an error during retrieving or consuming initial result, stream for incremental update is NOT interrupted.

      If there is an error during emitting an update, subscription is cancelled causing further emits not reaching the destination.

      If a subscription query with the same query identifier is already registered, the returned MessageStream will be failed with a SubscriptionQueryAlreadyRegisteredException instead of throwing the exception. This allows callers to handle double subscription scenarios gracefully through the stream API.

      Specified by:
      subscriptionQuery in interface QueryBus
      Parameters:
      query - The subscription query to dispatch.
      context - The processing context under which the query is being published (can be null).
      updateBufferSize - The size of the buffer which accumulates update.
      Returns:
      query result containing initial result and incremental update, or a failed MessageStream with SubscriptionQueryAlreadyRegisteredException if a subscription with the same query identifier already exists.
    • subscribeToUpdates

      @Nonnull public MessageStream<SubscriptionQueryUpdateMessage> subscribeToUpdates(@Nonnull QueryMessage query, int updateBufferSize)
      Description copied from interface: QueryBus
      Subscribes the given query with the given updateBufferSize, and returns the MessageStream that provides the update of the subscription query.

      Can be used directly instead when fine-grained control of update handlers is required. If using the update directly is not mandatory for your use case, we strongly recommend using QueryBus.subscriptionQuery(QueryMessage, ProcessingContext, int) instead.

      Note that the returned MessageStream must be consumed from before the buffer fills up. Once the buffer is full, any attempt to add an update will complete the stream with an exception.

      If a subscription query with the same query identifier is already registered, the returned MessageStream will be failed with a SubscriptionQueryAlreadyRegisteredException instead of throwing the exception.

      Specified by:
      subscribeToUpdates in interface QueryBus
      Parameters:
      query - The subscription query for which we register an update handler.
      updateBufferSize - The size of the buffer that accumulates update.
      Returns:
      a MessageStream of update for the given subscription query, or a failed MessageStream with SubscriptionQueryAlreadyRegisteredException if a subscription with the same query identifier already exists.
    • emitUpdate

      @Nonnull public CompletableFuture<Void> emitUpdate(@Nonnull Predicate<QueryMessage> filter, @Nonnull Supplier<SubscriptionQueryUpdateMessage> updateSupplier, @Nullable ProcessingContext context)
      Description copied from interface: QueryBus
      Emits the outcome of the updateSupplier to subscription queries matching the given queryName and given filter.
      Specified by:
      emitUpdate in interface QueryBus
      Parameters:
      filter - A predicate filtering on QueryMessages. The updateSupplier will only be sent to subscription queries matching this filter.
      updateSupplier - The update supplier to emit for subscription queries matching the given filter.
      context - The processing context under which the updateSupplier is being emitted (can be null).
      Returns:
      A future completing whenever the updateSupplier has been emitted.
    • completeSubscriptions

      @Nonnull public CompletableFuture<Void> completeSubscriptions(@Nonnull Predicate<QueryMessage> filter, @Nullable ProcessingContext context)
      Description copied from interface: QueryBus
      Completes subscription queries matching the given filter.

      To be used whenever there are no subsequent update to emit left.

      Specified by:
      completeSubscriptions in interface QueryBus
      Parameters:
      filter - A predicate filtering on QueryMessages. Subscription queries matching this filter will be completed.
      context - The processing context within which to complete subscription queries (can be null).
      Returns:
      A future completing whenever all matching subscription queries have been completed.
    • completeSubscriptionsExceptionally

      @Nonnull public CompletableFuture<Void> completeSubscriptionsExceptionally(@Nonnull Predicate<QueryMessage> filter, @Nonnull Throwable cause, @Nullable ProcessingContext context)
      Description copied from interface: QueryBus
      Completes subscription queries matching the given filter exceptionally with the given cause.

      To be used whenever emitting update should be stopped due to some exception.

      Specified by:
      completeSubscriptionsExceptionally in interface QueryBus
      Parameters:
      filter - A predicate filtering on QueryMessages. Subscription queries matching this filter will be completed exceptionally.
      cause - the cause of an error
      context - The processing context within which to complete subscription queries exceptionally (can be null).
      Returns:
      A future completing whenever all matching subscription queries have been completed exceptionally.
    • describeTo

      public void describeTo(@Nonnull ComponentDescriptor descriptor)
      Description copied from interface: DescribableComponent
      Describe the properties of this DescribableComponent with the given descriptor.

      Components should call the appropriate describeProperty methods on the descriptor to register their properties. The descriptor is responsible for determining how these properties are formatted and structured in the final output.

      Best Practices: As a general rule, all relevant fields of a DescribableComponent implementation should be described in this method. However, developers have discretion to include only the fields that make sense in the context. Not every field may be meaningful for description purposes, especially internal implementation details. Furthermore, components might want to expose different information based on their current state. The final decision on what properties to include lies with the person implementing the describeTo method, who should focus on providing information that is useful for understanding the component's configuration and state.

      Example implementation:

       public void describeTo(ComponentDescriptor descriptor) {
           descriptor.describeProperty("name", this.name);
           descriptor.describeProperty("enabled", this.enabled);
           descriptor.describeProperty("configuration", this.configuration); // A nested component
           descriptor.describeProperty("handlers", this.eventHandlers);      // A collection
       }
       
      Specified by:
      describeTo in interface DescribableComponent
      Parameters:
      descriptor - The component descriptor to describe this DescribableComponentn its properties in.