public class JpaEventStorageEngine extends BatchingEventStorageEngine
By default, the payload of events is stored as a serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a specific aggregate in the correct order.
| Modifier and Type | Class and Description |
|---|---|
static class |
JpaEventStorageEngine.Builder
Builder class to instantiate a
JpaEventStorageEngine. |
upcasterChain| Modifier | Constructor and Description |
|---|---|
protected |
JpaEventStorageEngine(JpaEventStorageEngine.Builder builder)
Instantiate a
JpaEventStorageEngine based on the fields contained in the JpaEventStorageEngine.Builder. |
| Modifier and Type | Method and Description |
|---|---|
protected void |
appendEvents(List<? extends EventMessage<?>> events,
Serializer serializer)
Append given
events to the backing database. |
protected static <T> DomainEventMessage<T> |
asDomainEventMessage(EventMessage<T> event)
Converts an
EventMessage to a DomainEventMessage. |
static JpaEventStorageEngine.Builder |
builder()
Instantiate a Builder to be able to create a
JpaEventStorageEngine. |
protected Object |
createEventEntity(EventMessage<?> eventMessage,
Serializer serializer)
Returns a Jpa event entity for given
eventMessage. |
TrackingToken |
createHeadToken()
Creates a token that is at the head of an event stream - that tracks all new events.
|
protected Object |
createSnapshotEntity(DomainEventMessage<?> snapshot,
Serializer serializer)
Returns a Jpa snapshot entity for given
snapshot of an aggregate. |
TrackingToken |
createTailToken()
Creates a token that is at the tail of an event stream - that tracks events from the beginning of time.
|
TrackingToken |
createTokenAt(Instant dateTime)
Creates a token that tracks all events after given
dateTime. |
protected void |
deleteSnapshots(String aggregateIdentifier,
long sequenceNumber)
Deletes all snapshots from the underlying storage with given
aggregateIdentifier. |
protected String |
domainEventEntryEntityName()
Returns the name of the Jpa event entity.
|
protected jakarta.persistence.EntityManager |
entityManager()
Provides an
EntityManager instance for storing and fetching event data. |
protected List<? extends DomainEventData<?>> |
fetchDomainEvents(String aggregateIdentifier,
long firstSequenceNumber,
int batchSize)
Returns a batch of events published by an aggregate with given
aggregateIdentifier. |
protected List<Object[]> |
fetchEvents(GapAwareTrackingToken token)
Returns a batch of event data as object entries in the event storage with a
greater than the given
token. |
protected List<? extends TrackedEventData<?>> |
fetchTrackedEvents(TrackingToken lastToken,
int batchSize)
Returns a batch of serialized event data entries in the event storage that have a
TrackingToken greater
than the given lastToken. |
Optional<Long> |
lastSequenceNumberFor(String aggregateIdentifier)
Returns the last known sequence number for the given
aggregateIdentifier. |
protected Stream<? extends DomainEventData<?>> |
readSnapshotData(String aggregateIdentifier)
Returns a stream of serialized event entries for given
aggregateIdentifier if the backing database
contains a snapshot of the aggregate. |
void |
setGapCleaningThreshold(int gapCleaningThreshold)
Sets the threshold of number of gaps in a token before an attempt to clean gaps up is taken.
|
void |
setGapTimeout(int gapTimeout)
Sets the amount of time until a 'gap' in a TrackingToken may be considered timed out.
|
protected String |
snapshotEventEntryEntityName()
Returns the name of the Snapshot event entity.
|
protected void |
storeSnapshot(DomainEventMessage<?> snapshot,
Serializer serializer)
Store the given
snapshot of an Aggregate. |
batchSize, fetchForAggregateUntilEmpty, readEventData, readEventDataappendEvents, getEventSerializer, getSnapshotSerializer, handlePersistenceException, readEvents, readEvents, readSnapshot, storeSnapshotclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitappendEvents, readEventsprotected JpaEventStorageEngine(JpaEventStorageEngine.Builder builder)
JpaEventStorageEngine based on the fields contained in the JpaEventStorageEngine.Builder.
Will assert that the event and snapshot Serializer, the EntityManagerProvider and TransactionManager are not null, and will throw an AxonConfigurationException if any of them is
null.
builder - the JpaEventStorageEngine.Builder used to instantiate a JpaEventStorageEngine instancepublic static JpaEventStorageEngine.Builder builder()
JpaEventStorageEngine.
The following configurable fields have defaults:
EventUpcaster defaults to an NoOpEventUpcaster.PersistenceExceptionResolver is defaulted to a SQLErrorCodesResolver, if the
DataSource is providedsnapshotFilter defaults to a SnapshotFilter.allowAll() instance.batchSize defaults to an integer of size 100.explicitFlush defaults to true.maxGapOffset defaults to an integer of size 10000.lowestGlobalSequence defaults to a long of size 1.gapTimeout defaults to an integer of size 60000 (1 minute).gapCleaningThreshold defaults to an integer of size 250.
The event and snapshot Serializer, the EntityManagerProvider and TransactionManager are
hard requirements and as such should be provided.
JpaEventStorageEngineprotected static <T> DomainEventMessage<T> asDomainEventMessage(EventMessage<T> event)
EventMessage to a DomainEventMessage. If the message already is a DomainEventMessage it will be returned as is. Otherwise, a new GenericDomainEventMessage is made with
null type, aggregateIdentifier equal to messageIdentifier and sequence number of 0L.
Doing so allows using the DomainEventEntry to store both a GenericEventMessage and a GenericDomainEventMessage.
T - the type of payload in the messageevent - the input event messageprotected List<Object[]> fetchEvents(GapAwareTrackingToken token)
token. Size of event is decided by BatchingEventStorageEngine.batchSize().token - Object describing the global index of the last processed event.protected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize)
BatchingEventStorageEngineTrackingToken greater
than the given lastToken. Event entries in the stream should be ordered by tracking token. If the lastToken is null a stream containing all events should be returned.
Only if the returned List is empty the event storage assumes that the backing database holds no further applicable entries.
fetchTrackedEvents in class BatchingEventStorageEnginelastToken - Object describing the global index of the last processed event or null to create a
stream of all events in the storebatchSize - The maximum number of events that should be returnedprotected List<? extends DomainEventData<?>> fetchDomainEvents(String aggregateIdentifier, long firstSequenceNumber, int batchSize)
BatchingEventStorageEngineaggregateIdentifier.
The sequence numbers in the returned batch should be ordered by sequence number. The first event in the batch
should have a sequence number equal to or larger than given firstSequenceNumber. Implementations should
make sure the returned batch does not contain gaps between events due to uncommitted storage transactions.
If the returned number of entries is smaller than the given batchSize it is assumed that the storage
holds no further applicable entries. Implementations for which this is not always the case should override
BatchingEventStorageEngine.fetchForAggregateUntilEmpty() to return true and preferably configure a better
BatchingEventStorageEngine.Builder.finalAggregateBatchPredicate(Predicate) to provide a better heuristic for detecting the last
batch in a stream.fetchDomainEvents in class BatchingEventStorageEngineaggregateIdentifier - The identifier of the aggregate to open a stream forfirstSequenceNumber - The sequence number of the first excepted event entrybatchSize - The maximum number of events that should be returnedprotected Stream<? extends DomainEventData<?>> readSnapshotData(String aggregateIdentifier)
AbstractEventStorageEngineaggregateIdentifier if the backing database
contains a snapshot of the aggregate.
It is required that specific event storage engines return snapshots in descending order of their sequence number.
readSnapshotData in class AbstractEventStorageEngineaggregateIdentifier - The aggregate identifier to fetch a snapshot forprotected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer)
AbstractEventStorageEngineevents to the backing database. Use the given serializer to serialize the event's
payload and metadata.appendEvents in class AbstractEventStorageEngineevents - Events to append to the databaseserializer - Serializer used to convert the events to a suitable format for storageprotected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer)
AbstractEventStorageEnginesnapshot of an Aggregate. Implementations may override any existing snapshot of the
Aggregate with the given snapshot.storeSnapshot in class AbstractEventStorageEnginesnapshot - Snapshot Event of the aggregateserializer - Serializer used to convert the snapshot event to a suitable format for storagepublic Optional<Long> lastSequenceNumberFor(@Nonnull String aggregateIdentifier)
EventStorageEngineaggregateIdentifier.
While it's recommended to use the sequence numbers from the DomainEventStream, there are cases where
knowing the sequence number is required, without having read the actual events. In such case, this method is a
viable alternative.
aggregateIdentifier - The identifier to find the last sequence number forpublic TrackingToken createTailToken()
EventStorageEnginenull is returnedpublic TrackingToken createHeadToken()
EventStorageEnginenull is returnedpublic TrackingToken createTokenAt(@Nonnull Instant dateTime)
EventStorageEnginedateTime. If there is an event exactly at the given
dateTime, it will be tracked too.dateTime - The date and time for determining criteria how the tracking token should be created. A tracking
token should point to very first event before this date and time.dateTime, if there aren't events matching this criteria null is returnedprotected void deleteSnapshots(String aggregateIdentifier, long sequenceNumber)
aggregateIdentifier.aggregateIdentifier - the identifier of the aggregate to delete snapshots forsequenceNumber - The sequence number from which value snapshots should be keptprotected Object createEventEntity(EventMessage<?> eventMessage, Serializer serializer)
eventMessage. Use the given serializer to serialize the
payload and metadata of the event.eventMessage - the event message to storeserializer - the serializer to serialize the payload and metadataprotected Object createSnapshotEntity(DomainEventMessage<?> snapshot, Serializer serializer)
snapshot of an aggregate. Use the given serializer to
serialize the payload and metadata of the snapshot event.snapshot - the domain event message containing a snapshot of the aggregateserializer - the serializer to serialize the payload and metadataprotected String domainEventEntryEntityName()
protected String snapshotEventEntryEntityName()
protected jakarta.persistence.EntityManager entityManager()
EntityManager instance for storing and fetching event data.public void setGapTimeout(int gapTimeout)
gapTimeout - The amount of time, in milliseconds until a gap may be considered timed out.public void setGapCleaningThreshold(int gapCleaningThreshold)
gapCleaningThreshold - The number of gaps before triggering a cleanup.Copyright © 2010–2023. All rights reserved.