Class CachingSequencedDeadLetterQueue<M extends Message>
- Type Parameters:
M- The type ofMessagecontained in thedead letterswithin this queue.
- All Implemented Interfaces:
SequencedDeadLetterQueue<M>
SequencedDeadLetterQueue that adds per-segment caching of sequence identifiers to optimize
contains(Object, ProcessingContext) lookups. This is particularly important for high-throughput event
processing where checking if an event's sequence is already dead-lettered should be as fast as possible.
Each Segment gets its own independent SequenceIdentifierCache, obtained from the
ProcessingContext. When a segment is released, only that segment's cache is removed via
invalidateCache(ProcessingContext), leaving other segments' caches intact.
If no segment is available in the ProcessingContext (or the context is null), operations delegate
directly to the underlying queue without caching.
Thread-safety note: This class is not thread-safe when performing operations for the same sequence identifier
concurrently. It is designed for internal use by DeadLetteringEventHandlingComponent, where operations on a
given sequence identifier are already serialized by the upstream
SequencingEventHandlingComponent.
Different segments can operate concurrently without interference. External synchronization must be provided if this
class is used in other contexts where concurrent access to the same sequence identifier is possible.
- Since:
- 5.1.0
- Author:
- Mateusz Nowak
- See Also:
-
Constructor Summary
ConstructorsConstructorDescriptionConstructs a caching decorator with the given delegate and default cache settings.CachingSequencedDeadLetterQueue(SequencedDeadLetterQueue<M> delegate, int cacheMaxSize) Constructs a caching decorator with the given delegate and cache max size. -
Method Summary
Modifier and TypeMethodDescriptionamountOfSequences(@Nullable ProcessingContext context) Returns the number of unique sequences contained in this queue.intReturns the total size of identifiers cached as enqueued, aggregated across all segments.intReturns the total size of identifiers cached as not enqueued, aggregated across all segments.clear(@Nullable ProcessingContext context) Clears out alldead letterspresent in this queue.contains(Object sequenceIdentifier, @Nullable ProcessingContext context) Check whether there's a sequence ofdead lettersfor the givensequenceIdentifier.CompletableFuture<Iterable<Iterable<DeadLetter<? extends M>>>> deadLetters(@Nullable ProcessingContext context) Return alldead lettersequences held by this queue.CompletableFuture<Iterable<DeadLetter<? extends M>>> deadLetterSequence(Object sequenceIdentifier, @Nullable ProcessingContext context) Return all thedead lettersfor the givensequenceIdentifierin insert order.enqueue(Object sequenceIdentifier, DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Enqueues adead lettercontaining an implementation ofMto this queue.enqueueIfPresent(Object sequenceIdentifier, Supplier<DeadLetter<? extends M>> letterBuilder, @Nullable ProcessingContext context) Enqueue the result of the givenletterBuilderonly if there already are otherdead letterswith the samesequenceIdentifierpresent in this queue.evict(DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Evict the givenletterfrom this queue.voidinvalidateCache(@Nullable ProcessingContext context) Invalidates the sequence identifier cache for the segment found in the givenProcessingContext.isFull(Object sequenceIdentifier, @Nullable ProcessingContext context) Validates whether this queue is full for the givensequenceIdentifier.process(Predicate<DeadLetter<? extends M>> sequenceFilter, Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Process a single sequence of enqueueddead lettersthrough the givenprocessingTaskmatching thesequenceFilter.requeue(DeadLetter<? extends M> letter, UnaryOperator<DeadLetter<? extends M>> letterUpdater, @Nullable ProcessingContext context) Reenters the givenletter, updating the contents with theletterUpdater.sequenceSize(Object sequenceIdentifier, @Nullable ProcessingContext context) Returns the number of dead letters for the sequence matching the givensequenceIdentifiercontained in this queue.size(@Nullable ProcessingContext context) Returns the number of dead letters contained in this queue.Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitMethods inherited from interface org.axonframework.messaging.deadletter.SequencedDeadLetterQueue
process
-
Constructor Details
-
CachingSequencedDeadLetterQueue
Constructs a caching decorator with the given delegate and default cache settings.Each segment's cache is lazily initialized on first use by checking if the delegate queue is empty. If empty, the cache operates in an optimized mode where unknown identifiers are assumed to not be present.
- Parameters:
delegate- the underlyingSequencedDeadLetterQueueto delegate to
-
CachingSequencedDeadLetterQueue
Constructs a caching decorator with the given delegate and cache max size.Each segment's cache is lazily initialized on first use by checking if the delegate queue is empty. If empty, the cache operates in an optimized mode where unknown identifiers are assumed to not be present.
- Parameters:
delegate- the underlyingSequencedDeadLetterQueueto delegate tocacheMaxSize- the maximum size of the non-enqueued identifiers cache
-
-
Method Details
-
enqueue
public CompletableFuture<Void> enqueue(Object sequenceIdentifier, DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueEnqueues adead lettercontaining an implementation ofMto this queue.The
dead letterwill be appended to a sequence depending on thesequenceIdentifier. If there is no sequence yet, it will construct one.- Specified by:
enqueuein interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier of the sequence theletterbelongs to.letter- TheDeadLetterto enqueue.context- the processing context within which to enqueue the givenletter- Returns:
- A
CompletableFuturethat completes when the operation is done. The future completes exceptionally with aDeadLetterQueueOverflowExceptionwhen this queueis full.
-
enqueueIfPresent
public CompletableFuture<Boolean> enqueueIfPresent(Object sequenceIdentifier, Supplier<DeadLetter<? extends M>> letterBuilder, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueEnqueue the result of the givenletterBuilderonly if there already are otherdead letterswith the samesequenceIdentifierpresent in this queue.- Specified by:
enqueueIfPresentin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier of the sequence to store the result of theletterBuilderin.letterBuilder- TheDeadLetterbuilder constructing the letter to enqueue. Only invoked if the givensequenceIdentifieris contained.context- the processing context within which to enqueue the result of the givenletterBuilderif there is a sequence for the givensequenceIdentifier- Returns:
- A
CompletableFuturewithtrueif there aredead lettersfor the givensequenceIdentifierand thus theletterBuilder'soutcome is inserted. Otherwise, the future completes withfalse. The future completes exceptionally with aDeadLetterQueueOverflowExceptionwhen this queue isSequencedDeadLetterQueue.isFull(Object, ProcessingContext)for the givensequenceIdentifier.
-
evict
public CompletableFuture<Void> evict(DeadLetter<? extends M> letter, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueEvict the givenletterfrom this queue. Nothing happens if thedead letterdoes not exist in this queue.- Specified by:
evictin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
letter- Thedead letterto evict from this queue.context- the processing context within which to evict the givenletter- Returns:
- A
CompletableFuturethat completes when the eviction is done.
-
requeue
public CompletableFuture<Void> requeue(DeadLetter<? extends M> letter, UnaryOperator<DeadLetter<? extends M>> letterUpdater, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReenters the givenletter, updating the contents with theletterUpdater. This method should be invoked ifprocessingdecided to keep the letter in the queue.This operation adjusts the
DeadLetter.lastTouched(). It may adjust theDeadLetter.cause()andDeadLetter.diagnostics(), depending on the givenletterUpdater.- Specified by:
requeuein interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
letter- Thedead letterto reenter in this queue.letterUpdater- Alambdataking in the givenletterand updating the entry for requeueing. This may adjust theDeadLetter.cause()andDeadLetter.diagnostics(), for example.context- the processing context within which to requeue the givenletter- Returns:
- A
CompletableFuturethat completes when the requeue is done. The future completes exceptionally with aNoSuchDeadLetterExceptionif the givenletterdoes not exist in the queue.
-
contains
public CompletableFuture<Boolean> contains(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueCheck whether there's a sequence ofdead lettersfor the givensequenceIdentifier.- Specified by:
containsin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier used to validate for containeddead lettersinstances.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewithtrueif there aredead letterspresent for the givensequenceIdentifier,falseotherwise.
-
deadLetterSequence
public CompletableFuture<Iterable<DeadLetter<? extends M>>> deadLetterSequence(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturn all thedead lettersfor the givensequenceIdentifierin insert order.- Specified by:
deadLetterSequencein interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier of the sequence ofdead lettersto return.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith all thedead lettersfor the givensequenceIdentifierin insert order.
-
deadLetters
public CompletableFuture<Iterable<Iterable<DeadLetter<? extends M>>>> deadLetters(@Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturn alldead lettersequences held by this queue. The sequences are not necessarily returned in insert order.- Specified by:
deadLettersin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith alldead lettersequences held by this queue.
-
isFull
public CompletableFuture<Boolean> isFull(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueValidates whether this queue is full for the givensequenceIdentifier.This method returns
trueeither when the maximum amount of sequences or the maximum sequence size is reached.- Specified by:
isFullin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier of the sequence to validate for.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewithtrueeither when the limit of this queue is reached. Containsfalseotherwise.
-
size
Description copied from interface:SequencedDeadLetterQueueReturns the number of dead letters contained in this queue.- Specified by:
sizein interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of dead letters contained in this queue.
-
sequenceSize
public CompletableFuture<Long> sequenceSize(Object sequenceIdentifier, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueReturns the number of dead letters for the sequence matching the givensequenceIdentifiercontained in this queue.Note that there's a window of opportunity where the size might exceed the maximum sequence size to account for concurrent usage.
- Specified by:
sequenceSizein interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceIdentifier- The identifier of the sequence to retrieve the size from.context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of dead letters for the sequence matching the givensequenceIdentifier.
-
amountOfSequences
Description copied from interface:SequencedDeadLetterQueueReturns the number of unique sequences contained in this queue.Note that there's a window of opportunity where the size might exceed the maximum amount of sequences to account for concurrent usage of this dead letter queue.
- Specified by:
amountOfSequencesin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturewith the number of unique sequences contained in this queue.
-
process
public CompletableFuture<Boolean> process(Predicate<DeadLetter<? extends M>> sequenceFilter, Function<DeadLetter<? extends M>, CompletableFuture<EnqueueDecision<M>>> processingTask, @Nullable ProcessingContext context) Description copied from interface:SequencedDeadLetterQueueProcess a single sequence of enqueueddead lettersthrough the givenprocessingTaskmatching thesequenceFilter. It will pick the oldest available sequence, determined by theDeadLetter.lastTouched()field of the first entry in each sequence.Note that only a single matching sequence is processed! Furthermore, only the first dead letter is validated, because it is the blocker for the processing of the rest of the sequence.
Uses the
EnqueueDecisionreturned by theprocessingTaskto decide whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)a dead letter from the selected sequence. TheprocessingTaskis invoked as long as letters are present in the selected sequence and the result of processing returnsfalseforEnqueueDecision.shouldEnqueue()decision. The latter means the dead letter should be evicted.This operation protects against concurrent invocations of the
processingTaskon the filtered sequence. Doing so ensures enqueued messages are handled in order.- Specified by:
processin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
sequenceFilter- Alambdaselecting the sequences within this queue to process with theprocessingTask.processingTask- A function processing adead letter. Returns aCompletableFuturewith anEnqueueDecisionused to deduce whether toSequencedDeadLetterQueue.evict(DeadLetter, ProcessingContext)orSequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator, ProcessingContext)the dead letter.context- theProcessingContextin which the dead letters are processed- Returns:
- A
CompletableFuturewithtrueif an entire sequence ofdead letterswas processed successfully,falseotherwise. This means theprocessingTaskprocessed alldead lettersof a sequence and the outcome was to evict each instance.
-
clear
Clears out alldead letterspresent in this queue.Clears all per-segment caches and the delegate queue.
- Specified by:
clearin interfaceSequencedDeadLetterQueue<M extends Message>- Parameters:
context- theProcessingContextwherein this operation is invoked, if any- Returns:
- A
CompletableFuturethat completes when all dead letters have been cleared.
-
invalidateCache
Invalidates the sequence identifier cache for the segment found in the givenProcessingContext.The segment is extracted from the context internally. If no segment is present in the context, this method is a no-op.
Call this method when processing ownership changes (e.g., segment release) to ensure cache consistency. When ownership changes, another processor instance may have modified the queue, making cached information potentially stale.
- Parameters:
context- the processing context containing theSegmentto invalidate the cache for, may benull
-
cacheEnqueuedSize
public int cacheEnqueuedSize()Returns the total size of identifiers cached as enqueued, aggregated across all segments.- Returns:
- the total count of enqueued identifiers across all segment caches
-
cacheNonEnqueuedSize
public int cacheNonEnqueuedSize()Returns the total size of identifiers cached as not enqueued, aggregated across all segments.- Returns:
- the total count of non-enqueued identifiers across all segment caches
-