E
- An implementation of EventMessage
contained in the dead-letters
within this
queue.public class JdbcSequencedDeadLetterQueue<E extends EventMessage<?>> extends Object implements SequencedDeadLetterQueue<E>
SequencedDeadLetterQueue
, used for storing dead letters containing
event messages
durably. Use the createSchema(DeadLetterTableFactory)
operation to build
the table and indices required by this SequencedDeadLetterQueue
, providing the desired
DeadLetterTableFactory
. The statements
used by this queues methods can be
optimized by providing a custom DeadLetterStatementFactory
.
Keeps the insertion order intact by saving an incremented index within each unique sequence, backed by the
index
property. Each sequence is uniquely identified by the sequence
identifier, stored in the DeadLetterSchema.sequenceIdentifierColumn()
sequence identifier} field.
When processing an item, single execution across all applications is guaranteed by setting the
processing started
property, locking other processes out of the
sequence for the configured claimDuration
(30 seconds by default).
The stored entries are converted to a JdbcDeadLetter
when they need to be processed or filtered. In order to
restore the original EventMessage
the DeadLetterJdbcConverter
is used. The default supports all
EventMessage
implementations provided by the framework. If you have a custom variant, you have to build your
own.
upcasters
are not supported by this implementation, so
breaking changes for events messages stored in the queue should be avoided.
Modifier and Type | Class and Description |
---|---|
static class |
JdbcSequencedDeadLetterQueue.Builder<E extends EventMessage<?>>
Builder class to instantiate an
JdbcSequencedDeadLetterQueue . |
Modifier | Constructor and Description |
---|---|
protected |
JdbcSequencedDeadLetterQueue(JdbcSequencedDeadLetterQueue.Builder<E> builder)
Instantiate a JDBC-based
SequencedDeadLetterQueue through the given builder . |
Modifier and Type | Method and Description |
---|---|
long |
amountOfSequences()
Returns the number of unique sequences contained in this queue.
|
static <E extends EventMessage<?>> |
builder()
Instantiate a builder to construct a
JdbcSequencedDeadLetterQueue . |
void |
clear()
Clears out all
dead letters present in this queue. |
boolean |
contains(Object sequenceIdentifier)
Check whether there's a sequence of
dead letters for the given sequenceIdentifier . |
void |
createSchema(DeadLetterTableFactory tableFactory)
Performs the DDL queries to create the schema necessary for this
SequencedDeadLetterQueue
implementation. |
Iterable<Iterable<DeadLetter<? extends E>>> |
deadLetters()
Return all
dead letter sequences held by this queue. |
Iterable<DeadLetter<? extends E>> |
deadLetterSequence(Object sequenceIdentifier)
Return all the
dead letters for the given sequenceIdentifier in insert order. |
void |
enqueue(Object sequenceIdentifier,
DeadLetter<? extends E> letter)
Enqueues a
dead letter containing an implementation of M to this queue. |
void |
evict(DeadLetter<? extends E> letter)
Evict the given
letter from this queue. |
boolean |
isFull(Object sequenceIdentifier)
Validates whether this queue is full for the given
sequenceIdentifier . |
boolean |
process(Function<DeadLetter<? extends E>,EnqueueDecision<E>> processingTask)
Process a sequence of enqueued
dead letters with the given processingTask . |
boolean |
process(Predicate<DeadLetter<? extends E>> sequenceFilter,
Function<DeadLetter<? extends E>,EnqueueDecision<E>> processingTask)
Process a sequence of enqueued
dead letters through the given processingTask matching
the sequenceFilter . |
void |
requeue(DeadLetter<? extends E> letter,
UnaryOperator<DeadLetter<? extends E>> letterUpdater)
Reenters the given
letter , updating the contents with the letterUpdater . |
long |
sequenceSize(Object sequenceIdentifier)
Returns the number of dead letters for the sequence matching the given
sequenceIdentifier contained in
this queue. |
long |
size()
Returns the number of dead letters contained in this queue.
|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
enqueueIfPresent
protected JdbcSequencedDeadLetterQueue(JdbcSequencedDeadLetterQueue.Builder<E> builder)
SequencedDeadLetterQueue
through the given builder
.
Will validate whether the processing group
,
ConnectionProvider
,
TransactionManager
,
DeadLetterStatementFactory
and
DeadLetterJdbcConverter
are set. If for any this is not the
case an AxonConfigurationException
is thrown.
builder
- The JdbcSequencedDeadLetterQueue.Builder
used to instantiate a JdbcSequencedDeadLetterQueue
instance.public static <E extends EventMessage<?>> JdbcSequencedDeadLetterQueue.Builder<E> builder()
JdbcSequencedDeadLetterQueue
.
The following defaults are set by the builder:
table's schema
defaults to a DeadLetterSchema.defaultSchema()
.maximum amount of sequences
defaults to 1024
.maximum sequence size
defaults to 1024
.page size
defaults to 100
.claim duration
defaults to 30 seconds.
The processing group
,
ConnectionProvider
, and
TransactionManager
are hard requirements and should be
provided.
The JdbcSequencedDeadLetterQueue.Builder.statementFactory(DeadLetterStatementFactory)
and
JdbcSequencedDeadLetterQueue.Builder.converter(DeadLetterJdbcConverter)
are also hard requirements, but users can choose to either set
both explicitly or rely on the DefaultDeadLetterStatementFactory
and
DefaultDeadLetterJdbcConverter
constructed through the
generic Serializer
and
event Serializer
.
E
- The type of EventMessage
maintained in the dead letter
of this
SequencedDeadLetterQueue
.JdbcSequencedDeadLetterQueue
.public void createSchema(DeadLetterTableFactory tableFactory)
SequencedDeadLetterQueue
implementation.tableFactory
- The factory constructing the PreparedStatement
to construct a
DeadLetter
entry table based on the
configured
DeadLetterSchema
.public void enqueue(@Nonnull Object sequenceIdentifier, @Nonnull DeadLetter<? extends E> letter) throws DeadLetterQueueOverflowException
SequencedDeadLetterQueue
dead letter
containing an implementation of M
to this queue.
The dead letter
will be appended to a sequence depending on the sequenceIdentifier
. If there is
no sequence yet, it will construct one.
enqueue
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceIdentifier
- The identifier of the sequence the letter
belongs to.letter
- The DeadLetter
to enqueue.DeadLetterQueueOverflowException
- Thrown when this queue is full
.public void evict(DeadLetter<? extends E> letter)
SequencedDeadLetterQueue
letter
from this queue. Nothing happens if the dead letter
does not
exist in this queue.evict
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
letter
- The dead letter
to evict from this queue.public void requeue(@Nonnull DeadLetter<? extends E> letter, @Nonnull UnaryOperator<DeadLetter<? extends E>> letterUpdater) throws NoSuchDeadLetterException
SequencedDeadLetterQueue
letter
, updating the contents with the letterUpdater
. This method should be
invoked if processing
decided to keep the letter in the queue.
This operation adjusts the DeadLetter.lastTouched()
. It may adjust the DeadLetter.cause()
and
DeadLetter.diagnostics()
, depending on the given letterUpdater
.
requeue
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
letter
- The dead letter
to reenter in this queue.letterUpdater
- A lambda
taking in the given letter
and updating the entry for
requeueing. This may adjust the DeadLetter.cause()
and
DeadLetter.diagnostics()
, for example.NoSuchDeadLetterException
- Thrown if the given letter
does not exist in the queue.public boolean contains(@Nonnull Object sequenceIdentifier)
SequencedDeadLetterQueue
dead letters
for the given sequenceIdentifier
.contains
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceIdentifier
- The identifier used to validate for contained dead letters
instances.true
if there are dead letters
present for the given
sequenceIdentifier
, false
otherwise.public Iterable<DeadLetter<? extends E>> deadLetterSequence(@Nonnull Object sequenceIdentifier)
SequencedDeadLetterQueue
dead letters
for the given sequenceIdentifier
in insert order.deadLetterSequence
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceIdentifier
- The identifier of the sequence of dead letters
to return.dead letters
for the given sequenceIdentifier
in insert order.public Iterable<Iterable<DeadLetter<? extends E>>> deadLetters()
SequencedDeadLetterQueue
dead letter
sequences held by this queue. The sequences are not necessarily
returned in insert order.deadLetters
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
dead letter
sequences held by this queue.public boolean isFull(@Nonnull Object sequenceIdentifier)
SequencedDeadLetterQueue
sequenceIdentifier
.
This method returns true
either when the maximum amount of sequences or the maximum sequence size is
reached.
isFull
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceIdentifier
- The identifier of the sequence to validate for.true
either when the limit of this queue is reached. Returns false
otherwise.public long size()
SequencedDeadLetterQueue
size
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
public long sequenceSize(@Nonnull Object sequenceIdentifier)
SequencedDeadLetterQueue
sequenceIdentifier
contained in
this queue.
Note that there's a window of opportunity where the size might exceed the maximum sequence size to accompany concurrent usage.
sequenceSize
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceIdentifier
- The identifier of the sequence to retrieve the size from.sequenceIdentifier
.public long amountOfSequences()
SequencedDeadLetterQueue
Note that there's a window of opportunity where the size might exceed the maximum amount of sequences to accompany concurrent usage of this dead letter queue.
amountOfSequences
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
public boolean process(@Nonnull Predicate<DeadLetter<? extends E>> sequenceFilter, @Nonnull Function<DeadLetter<? extends E>,EnqueueDecision<E>> processingTask)
SequencedDeadLetterQueue
dead letters
through the given processingTask
matching
the sequenceFilter
. Will pick the oldest available sequence based on the DeadLetter.lastTouched()
field from every sequence's first entry.
Note that only a single matching sequence is processed! Furthermore, only the first dead letter is validated, because it is the blocker for the processing of the rest of the sequence.
Uses the EnqueueDecision
returned by the processingTask
to decide whether to
SequencedDeadLetterQueue.evict(DeadLetter)
or SequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator)
a dead letter from the selected
sequence. The processingTask
is invoked as long as letters are present in the selected sequence and the
result of processing returns false
for EnqueueDecision.shouldEnqueue()
decision. The latter means
the dead letter should be evicted.
This operation protects against concurrent invocations of the processingTask
on the filtered sequence.
Doing so ensure enqueued messages are handled in order.
process
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
sequenceFilter
- A lambda
selecting the sequences within this queue to process with the
processingTask
.processingTask
- A function processing a dead letter
. Returns a EnqueueDecision
used to deduce whether to SequencedDeadLetterQueue.evict(DeadLetter)
or
SequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator)
the dead letter.true
if an entire sequence of dead letters
was processed successfully,
false
otherwise. This means the processingTask
processed all dead letters
of a
sequence and the outcome was to evict each instance.public boolean process(@Nonnull Function<DeadLetter<? extends E>,EnqueueDecision<E>> processingTask)
SequencedDeadLetterQueue
dead letters
with the given processingTask
. Will pick
the oldest available sequence based on the DeadLetter.lastTouched()
field from every sequence's first
entry.
Note that only a single matching sequence is processed!
Uses the EnqueueDecision
returned by the processingTask
to decide whether to
SequencedDeadLetterQueue.evict(DeadLetter)
or SequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator)
the dead letter. The
processingTask
is invoked as long as letters are present in the selected sequence and the result of
processing returns false
for EnqueueDecision.shouldEnqueue()
decision. The latter means the dead
letter should be evicted.
This operation protects against concurrent invocations of the processingTask
on the filtered sequence. *
Doing so ensure enqueued messages are handled in order.
process
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
processingTask
- A function processing a dead letter
. Returns a EnqueueDecision
used to deduce whether to SequencedDeadLetterQueue.evict(DeadLetter)
or
SequencedDeadLetterQueue.requeue(DeadLetter, UnaryOperator)
the dead letter.true
if an entire sequence of dead letters
was processed successfully,
false
otherwise. This means the processingTask
processed all dead letters
of a
sequence and the outcome was to evict each instance.public void clear()
SequencedDeadLetterQueue
dead letters
present in this queue.clear
in interface SequencedDeadLetterQueue<E extends EventMessage<?>>
Copyright © 2010–2024. All rights reserved.