public class SimpleQueryBus extends Object implements QueryBus, QueryUpdateEmitter
In case multiple handlers are registered for the same query and response type, the query(QueryMessage)
method will invoke one of these handlers. Which one is unspecified.
Constructor and Description |
---|
SimpleQueryBus()
Initialize the query bus without monitoring on messages and a
LoggingQueryInvocationErrorHandler . |
SimpleQueryBus(MessageMonitor<? super QueryMessage<?,?>> messageMonitor,
MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor,
TransactionManager transactionManager,
QueryInvocationErrorHandler errorHandler)
Initialize the query bus with the given
messageMonitor , updateMessageMonitor , transactionManager and given errorHandler . |
SimpleQueryBus(MessageMonitor<? super QueryMessage<?,?>> messageMonitor,
TransactionManager transactionManager,
QueryInvocationErrorHandler errorHandler)
Initialize the query bus with the given
messageMonitor and given errorHandler . |
SimpleQueryBus(TransactionManager transactionManager)
Initialize the query bus using given
transactionManager to manage transactions around query execution
with. |
Modifier and Type | Method and Description |
---|---|
Set<SubscriptionQueryMessage<?,?,?>> |
activeSubscriptions()
Provides the set of running subscription queries.
|
void |
complete(Predicate<SubscriptionQueryMessage<?,?,?>> filter)
Completes subscription queries matching given filter.
|
void |
completeExceptionally(Predicate<SubscriptionQueryMessage<?,?,?>> filter,
Throwable cause)
Completes with an error subscription queries matching given filter.
|
<U> void |
emit(Predicate<SubscriptionQueryMessage<?,?,U>> filter,
SubscriptionQueryUpdateMessage<U> update)
Emits incremental update (as return value of provided update function) to subscription queries matching given
filter.
|
protected Map<String,Collection<org.axonframework.queryhandling.QuerySubscription>> |
getSubscriptions()
Returns the subscriptions for this query bus.
|
<Q,R> CompletableFuture<QueryResponseMessage<R>> |
query(QueryMessage<Q,R> query)
Dispatch the given
query to a single QueryHandler subscribed to the given query 's queryName
and responseType. |
Registration |
registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?,?>> interceptor)
Registers an interceptor that intercepts Queries as they are sent.
|
Registration |
registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?,?>> interceptor)
Registers an interceptor that is used to intercept Queries before they are passed to their
respective handlers.
|
<Q,R> Stream<QueryResponseMessage<R>> |
scatterGather(QueryMessage<Q,R> query,
long timeout,
TimeUnit unit)
Dispatch the given
query to all QueryHandlers subscribed to the given query 's queryName/responseType. |
<R> Registration |
subscribe(String queryName,
Type responseType,
MessageHandler<? super QueryMessage<?,R>> handler)
Subscribe the given
handler to queries with the given queryName and responseType . |
<Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> |
subscriptionQuery(SubscriptionQueryMessage<Q,I,U> query,
SubscriptionQueryBackpressure backpressure,
int updateBufferSize)
Dispatch the given
query to a single QueryHandler subscribed to the given query 's
queryName/initialResponseType/updateResponseType. |
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
subscriptionQuery
complete, completeExceptionally, emit, emit, emit
public SimpleQueryBus()
LoggingQueryInvocationErrorHandler
.public SimpleQueryBus(TransactionManager transactionManager)
transactionManager
to manage transactions around query execution
with. No monitoring is applied to messages and a LoggingQueryInvocationErrorHandler
is used
to log errors on handlers during a scatter-gather query.transactionManager
- The transaction manager to manage transactions around query execution withpublic SimpleQueryBus(MessageMonitor<? super QueryMessage<?,?>> messageMonitor, TransactionManager transactionManager, QueryInvocationErrorHandler errorHandler)
messageMonitor
and given errorHandler
.messageMonitor
- The message monitor notified for incoming messages and their resulttransactionManager
- The transaction manager to manage transactions around query execution witherrorHandler
- The error handler to invoke when query handler report an errorpublic SimpleQueryBus(MessageMonitor<? super QueryMessage<?,?>> messageMonitor, MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor, TransactionManager transactionManager, QueryInvocationErrorHandler errorHandler)
messageMonitor
, updateMessageMonitor
, transactionManager
and given errorHandler
.messageMonitor
- The message monitor notified for incoming messages and their resultupdateMessageMonitor
- The message monitor notified for incoming update message in regard to subscription
queriestransactionManager
- The transaction manager to manage transactions around query execution witherrorHandler
- The error handler to invoke when query handler report an errorpublic <R> Registration subscribe(String queryName, Type responseType, MessageHandler<? super QueryMessage<?,R>> handler)
QueryBus
handler
to queries with the given queryName
and responseType
.
Multiple handlers may subscribe to the same combination of queryName/responseType.public <Q,R> CompletableFuture<QueryResponseMessage<R>> query(QueryMessage<Q,R> query)
QueryBus
query
to a single QueryHandler subscribed to the given query
's queryName
and responseType. This method returns all values returned by the Query Handler as a Collection. This may or may
not be the exact collection as defined in the Query Handler.
If the Query Handler defines a single return object (i.e. not a collection or array), that object is returned as the sole entry in a singleton collection.
When no handlers are available that can answer the given query
, the returned CompletableFuture will be
completed with a NoHandlerForQueryException
.
public <Q,R> Stream<QueryResponseMessage<R>> scatterGather(QueryMessage<Q,R> query, long timeout, TimeUnit unit)
QueryBus
query
to all QueryHandlers subscribed to the given query
's queryName/responseType.
Returns a stream of results which blocks until all handlers have processed the request or when the timeout occurs.
If no handlers are available to provide a result, or when all available handlers throw an exception while attempting to do so, the returned Stream is empty.
Note that any terminal operation (such as Stream.forEach(Consumer)
) on the Stream may cause it to
block until the timeout
has expired, awaiting additional data to include in the stream.
scatterGather
in interface QueryBus
Q
- the payload type of the queryR
- the response type of the queryquery
- the querytimeout
- time to wait for resultsunit
- unit for the timeoutpublic <Q,I,U> SubscriptionQueryResult<QueryResponseMessage<I>,SubscriptionQueryUpdateMessage<U>> subscriptionQuery(SubscriptionQueryMessage<Q,I,U> query, SubscriptionQueryBackpressure backpressure, int updateBufferSize)
QueryBus
query
to a single QueryHandler subscribed to the given query
's
queryName/initialResponseType/updateResponseType. The result is lazily created and there will be no execution of
the query handler before there is a subscription to the initial result. In order not to miss updates, the query
bus will queue all updates which happen after the subscription query is done and once the subscription to the
flux is made, these updates will be emitted.
If there is an error during retrieving or consuming initial result, stream for incremental updates is NOT interrupted.
If there is an error during emitting an update, subscription is cancelled causing further emits not reaching the destination.
Provided backpressure mechanism will be used to deal with fast emitters.
subscriptionQuery
in interface QueryBus
Q
- the payload type of the queryI
- the response type of the queryU
- the incremental response types of the queryquery
- the querybackpressure
- the backpressure mechanism to be used for emitting updatesupdateBufferSize
- the size of buffer which accumulates updates before subscription to the flux
is
madepublic <U> void emit(Predicate<SubscriptionQueryMessage<?,?,U>> filter, SubscriptionQueryUpdateMessage<U> update)
QueryUpdateEmitter
emit
in interface QueryUpdateEmitter
U
- the type of the updatefilter
- predicate on subscription query message used to filter subscription queriesupdate
- incremental update messagepublic void complete(Predicate<SubscriptionQueryMessage<?,?,?>> filter)
QueryUpdateEmitter
complete
in interface QueryUpdateEmitter
filter
- predicate on subscription query message used to filter subscription queriespublic void completeExceptionally(Predicate<SubscriptionQueryMessage<?,?,?>> filter, Throwable cause)
QueryUpdateEmitter
completeExceptionally
in interface QueryUpdateEmitter
filter
- predicate on subscription query message used to filter subscription queriescause
- the cause of an errorpublic Set<SubscriptionQueryMessage<?,?,?>> activeSubscriptions()
protected Map<String,Collection<org.axonframework.queryhandling.QuerySubscription>> getSubscriptions()
public Registration registerHandlerInterceptor(MessageHandlerInterceptor<? super QueryMessage<?,?>> interceptor)
interceptor
- the interceptor to invoke before passing a Query to the handlerpublic Registration registerDispatchInterceptor(MessageDispatchInterceptor<? super QueryMessage<?,?>> interceptor)
interceptor
- the interceptor to invoke when sending a QueryCopyright © 2010–2018. All rights reserved.