public class AxonServerQueryBus extends Object implements QueryBus, Distributed<QueryBus>, Lifecycle
QueryBus implementation that connects to Axon Server to submit and receive queries and query responses.
Delegates incoming queries to the provided localSegment.| Modifier and Type | Class and Description |
|---|---|
static class |
AxonServerQueryBus.Builder
Builder class to instantiate an
AxonServerQueryBus. |
Lifecycle.LifecycleHandler, Lifecycle.LifecycleRegistry| Constructor and Description |
|---|
AxonServerQueryBus(AxonServerQueryBus.Builder builder)
Instantiate a
AxonServerQueryBus based on the fields contained in the AxonServerQueryBus.Builder. |
| Modifier and Type | Method and Description |
|---|---|
static AxonServerQueryBus.Builder |
builder()
Instantiate a Builder to be able to create an
AxonServerQueryBus. |
void |
disconnect()
Disconnect the query bus from Axon Server, by unsubscribing all known query handlers.
|
QueryBus |
localSegment()
Return the message bus of type
MessageBus which is regarded as the local segment for this implementation. |
<Q,R> CompletableFuture<QueryResponseMessage<R>> |
query(QueryMessage<Q,R> queryMessage)
Dispatch the given
query to a single QueryHandler subscribed to the given query's queryName and
responseType. |
QueryUpdateEmitter |
queryUpdateEmitter()
Gets the
QueryUpdateEmitter associated with this QueryBus. |
Registration |
registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?,?>> dispatchInterceptor)
Register the given DispatchInterceptor.
|
Registration |
registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?,?>> interceptor)
Register the given
handlerInterceptor. |
void |
registerLifecycleHandlers(Lifecycle.LifecycleRegistry lifecycle)
Registers the activities to be executed in the various phases of an application's lifecycle.
|
<Q,R> Stream<QueryResponseMessage<R>> |
scatterGather(QueryMessage<Q,R> queryMessage,
long timeout,
TimeUnit timeUnit)
Dispatch the given
query to all QueryHandlers subscribed to the given query's
queryName/responseType. |
CompletableFuture<Void> |
shutdownDispatching()
Shutdown the query bus asynchronously for dispatching queries to Axon Server.
|
void |
start()
Start the Axon Server
QueryBus implementation. |
<Q,R> org.reactivestreams.Publisher<QueryResponseMessage<R>> |
streamingQuery(StreamingQueryMessage<Q,R> query)
Builds a
Publisher of responses to the given query. |
<R> Registration |
subscribe(String queryName,
Type responseType,
MessageHandler<? super QueryMessage<?,R>> handler)
Subscribe the given
handler to queries with the given queryName and responseType. |
<Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> |
subscriptionQuery(SubscriptionQueryMessage<Q,I,U> query,
int updateBufferSize)
Dispatch the given
query to a single QueryHandler subscribed to the given query's
queryName/initialResponseType/updateResponseType. |
<Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> |
subscriptionQuery(SubscriptionQueryMessage<Q,I,U> query,
SubscriptionQueryBackpressure backPressure,
int updateBufferSize)
Deprecated.
in favor of using the {
subscriptionQuery(SubscriptionQueryMessage, int)} |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitsubscriptionQuerypublic AxonServerQueryBus(AxonServerQueryBus.Builder builder)
AxonServerQueryBus based on the fields contained in the AxonServerQueryBus.Builder.builder - the AxonServerQueryBus.Builder used to instantiate a AxonServerQueryBus instancepublic <Q,R> org.reactivestreams.Publisher<QueryResponseMessage<R>> streamingQuery(StreamingQueryMessage<Q,R> query)
QueryBusPublisher of responses to the given query. The actual query is not dispatched until
there is a subscription to the result. The query is dispatched to a single query handler. Implementations may opt
for invoking several query handlers and then choosing a response from single one for performance or resilience
reasons.
When no handlers are available that can answer the given query, the return Publisher will be completed
with a NoHandlerForQueryException.
streamingQuery in interface QueryBusQ - the payload type of the streaming queryR - the response type of the streaming queryquery - the streaming query messagepublic static AxonServerQueryBus.Builder builder()
AxonServerQueryBus.
The QueryPriorityCalculator is defaulted to
QueryPriorityCalculator.defaultQueryPriorityCalculator(), the TargetContextResolver defaults to a
lambda returning the AxonServerConfiguration.getContext() as the context, the
ExecutorServiceBuilder defaults to ExecutorServiceBuilder.defaultQueryExecutorServiceBuilder().
The AxonServerConnectionManager and the SpanFactory defaults to a NoOpSpanFactory. The
AxonServerConfiguration, the local QueryBus, the QueryUpdateEmitter, and the message and
generic Serializers are hard requirements and as such should be provided.
AxonServerQueryBuspublic void registerLifecycleHandlers(@Nonnull Lifecycle.LifecycleRegistry lifecycle)
LifecycleregisterLifecycleHandlers in interface Lifecyclelifecycle - the lifecycle instance to register the handlers withLifecycle.LifecycleRegistry.onShutdown(int, Runnable),
LifecycleRegistry#onShutdown(int, LifecycleHandler),
Lifecycle.LifecycleRegistry.onStart(int, Runnable),
LifecycleRegistry#onStart(int, LifecycleHandler)public void start()
QueryBus implementation.public <R> Registration subscribe(@Nonnull String queryName, @Nonnull Type responseType, @Nonnull MessageHandler<? super QueryMessage<?,R>> handler)
QueryBushandler to queries with the given queryName and responseType.
Multiple handlers may subscribe to the same combination of queryName/responseType.public <Q,R> CompletableFuture<QueryResponseMessage<R>> query(@Nonnull QueryMessage<Q,R> queryMessage)
QueryBusquery to a single QueryHandler subscribed to the given query's queryName and
responseType. This method returns all values returned by the Query Handler as a Collection. This may or may not
be the exact collection as defined in the Query Handler.
If the Query Handler defines a single return object (i.e. not a collection or array), that object is returned as the sole entry in a singleton collection.
When no handlers are available that can answer the given query, the returned CompletableFuture will be
completed with a NoHandlerForQueryException.
public <Q,R> Stream<QueryResponseMessage<R>> scatterGather(@Nonnull QueryMessage<Q,R> queryMessage, long timeout, @Nonnull TimeUnit timeUnit)
QueryBusquery to all QueryHandlers subscribed to the given query's
queryName/responseType. Returns a stream of results which blocks until all handlers have processed the request or
when the timeout occurs.
If no handlers are available to provide a result, or when all available handlers throw an exception while attempting to do so, the returned Stream is empty.
Note that any terminal operation (such as Stream.forEach(Consumer)) on the Stream may cause it to block
until the timeout has expired, awaiting additional data to include in the stream.
scatterGather in interface QueryBusQ - the payload type of the queryR - the response type of the queryqueryMessage - the querytimeout - time to wait for resultstimeUnit - unit for the timeout@Deprecated public <Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q,I,U> query, SubscriptionQueryBackpressure backPressure, int updateBufferSize)
subscriptionQuery(SubscriptionQueryMessage, int)}query to a single QueryHandler subscribed to the given query's
queryName/initialResponseType/updateResponseType. The result is lazily created and there will be no execution of
the query handler before there is a subscription to the initial result. In order not to miss updates, the query
bus will queue all updates which happen after the subscription query is done and once the subscription to the
flux is made, these updates will be emitted.
If there is an error during retrieving or consuming initial result, stream for incremental updates is NOT interrupted.
If there is an error during emitting an update, subscription is cancelled causing further emits not reaching the destination.
Provided backpressure mechanism will be used to deal with fast emitters.
subscriptionQuery in interface QueryBusQ - the payload type of the queryI - the response type of the queryU - the incremental response types of the queryquery - the querybackPressure - the backpressure mechanism to be used for emitting updatesupdateBufferSize - the size of buffer which accumulates updates before subscription to the flux is
madepublic <Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> subscriptionQuery(@Nonnull SubscriptionQueryMessage<Q,I,U> query, int updateBufferSize)
QueryBusquery to a single QueryHandler subscribed to the given query's
queryName/initialResponseType/updateResponseType. The result is lazily created and there will be no execution of
the query handler before there is a subscription to the initial result. In order not to miss updates, the query
bus will queue all updates which happen after the subscription query is done and once the subscription to the
flux is made, these updates will be emitted.
If there is an error during retrieving or consuming initial result, stream for incremental updates is NOT interrupted.
If there is an error during emitting an update, subscription is cancelled causing further emits not reaching the destination.
subscriptionQuery in interface QueryBusQ - the payload type of the queryI - the response type of the queryU - the incremental response types of the queryquery - the queryupdateBufferSize - the size of buffer which accumulates updates before subscription to the flux is
madepublic QueryUpdateEmitter queryUpdateEmitter()
QueryBusQueryUpdateEmitter associated with this QueryBus.queryUpdateEmitter in interface QueryBusQueryUpdateEmitterpublic QueryBus localSegment()
DistributedMessageBus which is regarded as the local segment for this implementation.
Would return the message bus used to dispatch and handle messages in a local environment to bridge the gap in a
distributed set up.localSegment in interface Distributed<QueryBus>MessageBus which is the local segment for this distributed message bus implementationpublic Registration registerHandlerInterceptor(@Nonnull MessageHandlerInterceptor<? super QueryMessage<?,?>> interceptor)
MessageHandlerInterceptorSupporthandlerInterceptor. After registration, the interceptor will be invoked for each
handled Message on the messaging component that it was registered to, prior to invoking the message's handler.registerHandlerInterceptor in interface MessageHandlerInterceptorSupport<QueryMessage<?,?>>interceptor - The interceptor to register@Nonnull public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super QueryMessage<?,?>> dispatchInterceptor)
MessageDispatchInterceptorSupportregisterDispatchInterceptor in interface MessageDispatchInterceptorSupport<QueryMessage<?,?>>dispatchInterceptor - The interceptor to registerpublic void disconnect()
Phase.INBOUND_QUERY_CONNECTOR phase.public CompletableFuture<Void> shutdownDispatching()
Phase.OUTBOUND_QUERY_CONNECTORS phase.Copyright © 2010–2023. All rights reserved.