Class JpaSequencedDeadLetterQueue<M extends EventMessage>
- Type Parameters:
M- An implementation ofMessagecontained in thedead-letterswithin this queue.
- All Implemented Interfaces:
SequencedDeadLetterQueue<M>
SequencedDeadLetterQueue, used for storing dead letters containing
EventMessages durably as a DeadLetterEntry.
Keeps the insertion order intact by saving an incremented index within each unique sequence, backed by the
DeadLetterEntry.getSequenceIndex() property. Each sequence is uniquely identified by the sequence identifier,
stored in the DeadLetterEntry.getSequenceIdentifier() field.
When processing an item, single execution across all applications is guaranteed by setting the
DeadLetterEntry.getProcessingStarted() property, locking other processes out of the sequence for the
configured claimDuration (30 seconds by default).
The stored entries are converted to a JpaDeadLetter when they need to be processed or
filtered. In order to restore the original EventMessage the configured DeadLetterJpaConverter is
used. The default EventMessageDeadLetterJpaConverter supports all EventMessage implementations
provided by the framework.
invalid reference
upcasters
- Since:
- 4.6.0
- Author:
- Mitchell Herrijgers
-
Nested Class Summary
Nested ClassesModifier and TypeClassDescriptionstatic classBuilder class to instantiate anJpaSequencedDeadLetterQueue. -
Constructor Summary
ConstructorsModifierConstructorDescriptionprotectedInstantiate a JPASequencedDeadLetterQueuebased on the givenbuilder. -
Method Summary
Modifier and TypeMethodDescriptionamountOfSequences(@Nullable ProcessingContext context) Returns the number of unique sequences contained in this queue.static <M extends EventMessage>
JpaSequencedDeadLetterQueue.Builder<M> builder()Creates a new builder, capable of building aJpaSequencedDeadLetterQueueaccording to the provided configuration.clear(@Nullable ProcessingContext context) Clears out alldead letterspresent in this queue.contains(Object sequenceIdentifier, @Nullable ProcessingContext context) Check whether there's a sequence ofdead lettersfor the givensequenceIdentifier.CompletableFuture<Iterable<Iterable<DeadLetter<? extends M>>>> deadLetters(@Nullable ProcessingContext context) Return alldead lettersequences held by this queue.CompletableFuture<Iterable<DeadLetter<? extends M>>> deadLetterSequence(Object sequenceIdentifier, @Nullable ProcessingContext context) Return all thedead lettersfor the givensequenceIdentifierin insert order.enqueue(Object sequenceIdentifier, DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Enqueues adead lettercontaining an implementation ofMto this queue.evict(DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Evict the givenletterfrom this queue.isFull(Object sequenceIdentifier, @Nullable ProcessingContext context) Validates whether this queue is full for the givensequenceIdentifier.process(Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Process a single sequence of enqueueddead letterswith the givenprocessingTask.process(Predicate<DeadLetter<? extends M>> sequenceFilter, Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Process a single sequence of enqueueddead lettersthrough the givenprocessingTaskmatching thesequenceFilter.requeue(DeadLetter<? extends M> letter, UnaryOperator<DeadLetter<? extends M>> letterUpdater, @Nullable ProcessingContext context) Reenters the givenletter, updating the contents with theletterUpdater.sequenceSize(Object sequenceIdentifier, @Nullable ProcessingContext context) Returns the number of dead letters for the sequence matching the givensequenceIdentifiercontained in this queue.size(@Nullable ProcessingContext context) Returns the number of dead letters contained in this queue.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.axonframework.messaging.deadletter.SequencedDeadLetterQueue
enqueueIfPresent
-
Constructor Details
-
JpaSequencedDeadLetterQueue
Instantiate a JPASequencedDeadLetterQueuebased on the givenbuilder.- Parameters:
builder- TheJpaSequencedDeadLetterQueue.Builderused to instantiate aJpaSequencedDeadLetterQueueinstance.
-
-
Method Details
-
builder
Creates a new builder, capable of building aJpaSequencedDeadLetterQueueaccording to the provided configuration. Note that theJpaSequencedDeadLetterQueue.Builder.processingGroup(String),JpaSequencedDeadLetterQueue.Builder.transactionalExecutorProvider(TransactionalExecutorProvider),JpaSequencedDeadLetterQueue.Builder.eventConverter(EventConverter), andJpaSequencedDeadLetterQueue.Builder.genericConverter(Converter)are mandatory for the queue to be constructed.- Type Parameters:
M- An implementation ofMessagecontained in thedead-letterswithin this queue.- Returns:
- The builder
-
enqueue
public CompletableFuture<Void> enqueue(Object sequenceIdentifier, DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueEnqueues adead lettercontaining an implementation ofMto this queue.The
dead letterwill be appended to a sequence depending on thesequenceIdentifier. If there is no sequence yet, it will construct one.- Specified by:
enqueuein interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceIdentifier- The identifier of the sequence theletterbelongs to.letter- TheDeadLetterto enqueue.context- the processing context within which to enqueue the givenletter- Returns:
- A
CompletableFuturethat completes when the operation is done. The future completes exceptionally with aDeadLetterQueueOverflowExceptionwhen this queueis full.
-
evict
public CompletableFuture<Void> evict(DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueEvict the givenletterfrom this queue. Nothing happens if thedead letterdoes not exist in this queue.- Specified by:
evictin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
letter- Thedead letterto evict from this queue.context- the processing context within which to evict the givenletter- Returns:
- A
CompletableFuturethat completes when the eviction is done.
-
requeue
public CompletableFuture<Void> requeue(DeadLetter<? extends M> letter, UnaryOperator<DeadLetter<? extends M>> letterUpdater, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReenters the givenletter, updating the contents with theletterUpdater. This method should be invoked ifprocessingdecided to keep the letter in the queue.This operation adjusts the
DeadLetter.lastTouched(). It may adjust theDeadLetter.cause()andDeadLetter.diagnostics(), depending on the givenletterUpdater.- Specified by:
requeuein interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
letter- Thedead letterto reenter in this queue.letterUpdater- Alambdataking in the givenletterand updating the entry for requeueing. This may adjust theDeadLetter.cause()andDeadLetter.diagnostics(), for example.context- the processing context within which to requeue the givenletter- Returns:
- A
CompletableFuturethat completes when the requeue is done. The future completes exceptionally with aNoSuchDeadLetterExceptionif the givenletterdoes not exist in the queue.
-
contains
public CompletableFuture<Boolean> contains(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueCheck whether there's a sequence ofdead lettersfor the givensequenceIdentifier.- Specified by:
containsin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceIdentifier- The identifier used to validate for containeddead lettersinstances.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewithtrueif there aredead letterspresent for the givensequenceIdentifier,falseotherwise.
-
deadLetterSequence
public CompletableFuture<Iterable<DeadLetter<? extends M>>> deadLetterSequence(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturn all thedead lettersfor the givensequenceIdentifierin insert order.- Specified by:
deadLetterSequencein interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceIdentifier- The identifier of the sequence ofdead lettersto return.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith all thedead lettersfor the givensequenceIdentifierin insert order.
-
deadLetters
public CompletableFuture<Iterable<Iterable<DeadLetter<? extends M>>>> deadLetters(@Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturn alldead lettersequences held by this queue. The sequences are not necessarily returned in insert order.- Specified by:
deadLettersin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith alldead lettersequences held by this queue.
-
isFull
public CompletableFuture<Boolean> isFull(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueValidates whether this queue is full for the givensequenceIdentifier.This method returns
trueeither when the maximum amount of sequences or the maximum sequence size is reached.- Specified by:
isFullin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceIdentifier- The identifier of the sequence to validate for.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewithtrueeither when the limit of this queue is reached. Containsfalseotherwise.
-
process
public CompletableFuture<Boolean> process(Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Process a single sequence of enqueueddead letterswith the givenprocessingTask. It will pick the oldest available sequence, determined by theDeadLetter.lastTouched()field of the first entry in each sequence.Note that only a single matching sequence is processed!
Uses the
EnqueueDecisionreturned by theprocessingTaskto decide whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)the dead letter. TheprocessingTaskis invoked as long as letters are present in the selected sequence and the result of processing returnsfalseforEnqueueDecision.shouldEnqueue()decision. The latter means the dead letter should be evicted.This operation protects against concurrent invocations of the
processingTaskon the filtered sequence. Doing so ensures enqueued messages are handled in order.Optimized override that retrieves available sequences one at a time (page size of 1) instead of in batches. Since no
sequenceFilteris applied, every available sequence is a valid candidate, so there is no need to prefetch multiple sequences. This avoids unnecessary deserialization of dead letter entries that will never be evaluated.- Specified by:
processin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
processingTask- A function processing adead letter. Returns aCompletableFuturewith anEnqueueDecisionused to deduce whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)the dead letter.context- theProcessingContextin which the dead letters are processed- Returns:
- A
CompletableFuturewithtrueif an entire sequence ofdead letterswas processed successfully,falseotherwise. This means theprocessingTaskprocessed alldead lettersof a sequence and the outcome was to evict each instance. - See Also:
-
process
public CompletableFuture<Boolean> process(Predicate<DeadLetter<? extends M>> sequenceFilter, Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueProcess a single sequence of enqueueddead lettersthrough the givenprocessingTaskmatching thesequenceFilter. It will pick the oldest available sequence, determined by theDeadLetter.lastTouched()field of the first entry in each sequence.Note that only a single matching sequence is processed! Furthermore, only the first dead letter is validated, because it is the blocker for the processing of the rest of the sequence.
Uses the
EnqueueDecisionreturned by theprocessingTaskto decide whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)a dead letter from the selected sequence. TheprocessingTaskis invoked as long as letters are present in the selected sequence and the result of processing returnsfalseforEnqueueDecision.shouldEnqueue()decision. The latter means the dead letter should be evicted.This operation protects against concurrent invocations of the
processingTaskon the filtered sequence. Doing so ensures enqueued messages are handled in order.- Specified by:
processin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceFilter- Alambdaselecting the sequences within this queue to process with theprocessingTask.processingTask- A function processing adead letter. Returns aCompletableFuturewith anEnqueueDecisionused to deduce whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)the dead letter.context- theProcessingContextin which the dead letters are processed- Returns:
- A
CompletableFuturewithtrueif an entire sequence ofdead letterswas processed successfully,falseotherwise. This means theprocessingTaskprocessed alldead lettersof a sequence and the outcome was to evict each instance.
-
clear
Description copied from interface:SequencedDeadLetterQueueClears out alldead letterspresent in this queue.- Specified by:
clearin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturethat completes when all dead letters have been cleared.
-
sequenceSize
public CompletableFuture<Long> sequenceSize(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturns the number of dead letters for the sequence matching the givensequenceIdentifiercontained in this queue.Note that there's a window of opportunity where the size might exceed the maximum sequence size to account for concurrent usage.
- Specified by:
sequenceSizein interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
sequenceIdentifier- The identifier of the sequence to retrieve the size from.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of dead letters for the sequence matching the givensequenceIdentifier.
-
size
Description copied from interface:SequencedDeadLetterQueueReturns the number of dead letters contained in this queue.- Specified by:
sizein interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of dead letters contained in this queue.
-
amountOfSequences
Description copied from interface:SequencedDeadLetterQueueReturns the number of unique sequences contained in this queue.Note that there's a window of opportunity where the size might exceed the maximum amount of sequences to account for concurrent usage of this dead letter queue.
- Specified by:
amountOfSequencesin interfaceSequencedDeadLetterQueue<M extends EventMessage>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of unique sequences contained in this queue.
-