public class JdbcEventStorageEngine extends BatchingEventStorageEngine
EventStorageEngine implementation that uses JDBC to store and
fetch events.
By default, it stores the payload of events as a serialized blob of bytes. It uses other columns to store meta-data that allows quick finding of DomainEvents for a specific aggregate in the correct order.
Before using this store make sure the database contains a table named EventSchema.domainEventTable() and
EventSchema.snapshotTable() in which to store events and snapshots in respectively. For convenience, these
tables can be constructed through the createSchema(EventTableFactory) operation.
| Modifier and Type | Class and Description |
|---|---|
static class |
JdbcEventStorageEngine.Builder
Builder class to instantiate a
JdbcEventStorageEngine. |
upcasterChain| Modifier | Constructor and Description |
|---|---|
protected |
JdbcEventStorageEngine(JdbcEventStorageEngine.Builder builder)
Instantiate a
JdbcEventStorageEngine based on the fields contained in the JdbcEventStorageEngine.Builder. |
| Modifier and Type | Method and Description |
|---|---|
protected PreparedStatement |
appendEvents(Connection connection,
List<? extends EventMessage<?>> events,
Serializer serializer)
Creates a statement to be used at
appendEvents(List, Serializer). |
protected void |
appendEvents(List<? extends EventMessage<?>> events,
Serializer serializer)
Append given
events to the backing database. |
protected PreparedStatement |
appendSnapshot(Connection connection,
DomainEventMessage<?> snapshot,
Serializer serializer)
Creates a statement to be used at
AbstractEventStorageEngine.storeSnapshot(DomainEventMessage). |
static JdbcEventStorageEngine.Builder |
builder()
Instantiate a Builder to be able to create a
JdbcEventStorageEngine. |
protected PreparedStatement |
cleanGaps(Connection connection,
SortedSet<Long> gaps)
Creates a statement to be used at
cleanGaps(TrackingToken). |
TrackingToken |
createHeadToken()
Creates a token that is at the head of an event stream - that tracks all new events.
|
protected PreparedStatement |
createHeadToken(Connection connection)
Creates a statement to be used at
createHeadToken(). |
void |
createSchema(EventTableFactory schemaFactory)
Performs the DDL queries to create the schema necessary for this storage engine implementation.
|
TrackingToken |
createTailToken()
Creates a token that is at the tail of an event stream - that tracks events from the beginning of time.
|
protected PreparedStatement |
createTailToken(Connection connection)
Creates a statement to be used at
createTailToken(). |
protected PreparedStatement |
createTokenAt(Connection connection,
Instant dateTime)
Creates a statement to be used at
createTokenAt(Instant). |
TrackingToken |
createTokenAt(Instant dateTime)
Creates a token that tracks all events after given
dateTime. |
protected PreparedStatement |
deleteSnapshots(Connection connection,
String aggregateIdentifier,
long sequenceNumber)
Creates a statement to be used at
AbstractEventStorageEngine.storeSnapshot(DomainEventMessage). |
protected String |
domainEventFields()
Deprecated.
in favor of
EventSchema.domainEventFields() |
protected List<? extends DomainEventData<?>> |
fetchDomainEvents(String aggregateIdentifier,
long firstSequenceNumber,
int batchSize)
Returns a batch of events published by an aggregate with given
aggregateIdentifier. |
protected boolean |
fetchForAggregateUntilEmpty()
Specifies whether the
BatchingEventStorageEngine.readEventData(String, long) should proceed fetching events for an aggregate until
an empty batch is returned. |
protected PreparedStatement |
fetchTrackedEvents(Connection connection,
long index)
Creates a statement to be used at
fetchTrackedEvents(TrackingToken, int). |
protected List<? extends TrackedEventData<?>> |
fetchTrackedEvents(TrackingToken lastToken,
int batchSize)
Returns a batch of serialized event data entries in the event storage that have a
TrackingToken greater
than the given lastToken. |
protected Connection |
getConnection()
Returns a
Connection to the database. |
protected DomainEventData<?> |
getDomainEventData(ResultSet resultSet)
Extracts the next domain event entry from the given
resultSet. |
protected DomainEventData<?> |
getSnapshotData(ResultSet resultSet)
Extracts the next snapshot entry from the given
resultSet. |
protected TrackedEventData<?> |
getTrackedEventData(ResultSet resultSet,
GapAwareTrackingToken previousToken)
Extracts the next tracked event entry from the given
resultSet. |
protected PreparedStatement |
lastSequenceNumberFor(Connection connection,
String aggregateIdentifier)
Creates a statement to be used at
lastSequenceNumberFor(String). |
Optional<Long> |
lastSequenceNumberFor(String aggregateIdentifier)
Returns the last known sequence number for the given
aggregateIdentifier. |
protected PreparedStatement |
readEventData(Connection connection,
String identifier,
long firstSequenceNumber,
int batchSize)
Creates a statement to be used at
fetchDomainEvents(String, long, int) |
protected PreparedStatement |
readEventData(Connection connection,
TrackingToken lastToken,
int batchSize)
Creates a statement to read tracked event entries stored since given tracking token.
|
protected PreparedStatement |
readEventDataWithGaps(Connection connection,
long globalIndex,
int batchSize,
List<Long> gaps)
Creates a statement to be used at
fetchTrackedEvents(TrackingToken, int) |
protected PreparedStatement |
readEventDataWithoutGaps(Connection connection,
long globalIndex,
int batchSize)
Creates a statement to be used at
fetchTrackedEvents(TrackingToken, int) |
protected <T> T |
readPayload(ResultSet resultSet,
String columnName)
Reads a serialized object from the given
resultSet at given columnIndex. |
protected PreparedStatement |
readSnapshotData(Connection connection,
String identifier)
Creates a statement to be used at
readSnapshotData(String). |
protected Stream<? extends DomainEventData<?>> |
readSnapshotData(String aggregateIdentifier)
Returns a stream of serialized event entries for given
aggregateIdentifier if the backing database
contains a snapshot of the aggregate. |
protected Object |
readTimeStamp(ResultSet resultSet,
String columnName)
Reads a timestamp from the given
resultSet at given columnIndex. |
protected EventSchema |
schema()
Returns the
EventSchema that defines the table and column names of event tables in the database. |
void |
setGapCleaningThreshold(int gapCleaningThreshold)
Deprecated.
Use the
gapCleaningThreshold(int) in the builder()
instead |
void |
setGapTimeout(int gapTimeout)
Deprecated.
Use the
gapTimeout(int) in the builder() instead |
protected void |
storeSnapshot(DomainEventMessage<?> snapshot,
Serializer serializer)
Store the given
snapshot of an Aggregate. |
protected String |
trackedEventFields()
Deprecated.
in favor of
EventSchema.trackedEventFields() |
protected void |
writeTimestamp(PreparedStatement preparedStatement,
int position,
Instant timestamp)
Write a timestamp from a
Instant to a data value suitable for the database scheme. |
batchSize, readEventData, readEventDataappendEvents, getEventSerializer, getSnapshotSerializer, handlePersistenceException, readEvents, readEvents, readSnapshot, storeSnapshotclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitappendEvents, readEventsprotected JdbcEventStorageEngine(JdbcEventStorageEngine.Builder builder)
JdbcEventStorageEngine based on the fields contained in the JdbcEventStorageEngine.Builder.
Will assert that the event and snapshot Serializer, the ConnectionProvider and TransactionManager are not null, and will throw an AxonConfigurationException if any of them is
null.
builder - the JdbcEventStorageEngine.Builder used to instantiate a JdbcEventStorageEngine instancepublic static JdbcEventStorageEngine.Builder builder()
JdbcEventStorageEngine.
The following configurable fields have defaults:
EventUpcaster defaults to an NoOpEventUpcaster.PersistenceExceptionResolver is defaulted to a JdbcSQLErrorCodesResolversnapshotFilter defaults to a SnapshotFilter.allowAll() instance.batchSize defaults to an integer of size 100.dataType is defaulted to the byte[] type.EventSchema defaults to an EventSchema() call.maxGapOffset defaults to an integer of size 10000.lowestGlobalSequence defaults to a long of size 1.gapTimeout defaults to an integer of size 60000 (1 minute).gapCleaningThreshold defaults to an integer of size 250.extendedGapCheckEnabled defaults to true.createTokenAt defaults to JdbcEventStorageEngineStatements.createTokenAt(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.time.Instant).appendEvents defaults to JdbcEventStorageEngineStatements.appendEvents(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.Class<?>, java.util.List<? extends org.axonframework.eventhandling.EventMessage<?>>, org.axonframework.serialization.Serializer, org.axonframework.eventsourcing.eventstore.jdbc.statements.TimestampWriter).lastSequenceNumberFor defaults to JdbcEventStorageEngineStatements.lastSequenceNumberFor(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.String).createTailToken defaults to JdbcEventStorageEngineStatements.createTailToken(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema).createHeadToken defaults to JdbcEventStorageEngineStatements.createHeadToken(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema).appendSnapshot defaults to JdbcEventStorageEngineStatements.appendSnapshot(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.Class<?>, org.axonframework.eventhandling.DomainEventMessage<?>, org.axonframework.serialization.Serializer, org.axonframework.eventsourcing.eventstore.jdbc.statements.TimestampWriter).deleteSnapshots defaults to JdbcEventStorageEngineStatements.deleteSnapshots(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.String, long).fetchTrackedEvents defaults to JdbcEventStorageEngineStatements.fetchTrackedEvents(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, long).cleanGaps defaults to JdbcEventStorageEngineStatements.cleanGaps(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.util.SortedSet<java.lang.Long>).readEventDataForAggregate defaults to JdbcEventStorageEngineStatements.readEventDataForAggregate(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.String, long, int).readSnapshotData defaults to JdbcEventStorageEngineStatements.readSnapshotData(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, java.lang.String).readEventDataWithoutGaps defaults to JdbcEventStorageEngineStatements.readEventDataWithoutGaps(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, long, int).readEventDataWithGaps defaults to JdbcEventStorageEngineStatements.readEventDataWithGaps(java.sql.Connection, org.axonframework.eventsourcing.eventstore.jdbc.EventSchema, long, int, java.util.List<java.lang.Long>).
The event and snapshot Serializer, ConnectionProvider and TransactionManager are hard
requirements and as such should be provided.
JdbcEventStorageEngineprotected PreparedStatement createTokenAt(Connection connection, Instant dateTime) throws SQLException
createTokenAt(Instant).connection - The connection to the database.dateTime - The dateTime where the token will be created.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement appendEvents(Connection connection, List<? extends EventMessage<?>> events, Serializer serializer) throws SQLException
appendEvents(List, Serializer).connection - The connection to the database.events - The events to be added.serializer - The serializer for the payload and metadata.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement lastSequenceNumberFor(Connection connection, String aggregateIdentifier) throws SQLException
lastSequenceNumberFor(String).connection - The connection to the database.aggregateIdentifier - The identifier of the aggregate.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement createTailToken(Connection connection) throws SQLException
createTailToken().connection - The connection to the database.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement createHeadToken(Connection connection) throws SQLException
createHeadToken().connection - The connection to the database.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement appendSnapshot(Connection connection, DomainEventMessage<?> snapshot, Serializer serializer) throws SQLException
AbstractEventStorageEngine.storeSnapshot(DomainEventMessage).connection - The connection to the database.snapshot - The snapshot to be appended.serializer - The serializer for the payload and metadata.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement deleteSnapshots(Connection connection, String aggregateIdentifier, long sequenceNumber) throws SQLException
AbstractEventStorageEngine.storeSnapshot(DomainEventMessage).connection - The connection to the database.aggregateIdentifier - The identifier of the aggregate taken from the snapshot.sequenceNumber - The sequence number taken from the snapshot.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement fetchTrackedEvents(Connection connection, long index) throws SQLException
fetchTrackedEvents(TrackingToken, int).connection - The connection to the database.index - The index taken from the tracking token.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement cleanGaps(Connection connection, SortedSet<Long> gaps) throws SQLException
cleanGaps(TrackingToken).connection - The connection to the database.gaps - The Set of gaps taken from the tracking token.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readEventData(Connection connection, String identifier, long firstSequenceNumber, int batchSize) throws SQLException
fetchDomainEvents(String, long, int)connection - The connection to the database.identifier - The identifier of the aggregate.firstSequenceNumber - The expected sequence number of the first returned entry.batchSize - The number of items to include in the batch.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readSnapshotData(Connection connection, String identifier) throws SQLException
readSnapshotData(String).connection - The connection to the database.identifier - The identifier of the aggregate.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readEventDataWithoutGaps(Connection connection, long globalIndex, int batchSize) throws SQLException
fetchTrackedEvents(TrackingToken, int)connection - The connection to the database.globalIndex - The index taken from the tracking token.batchSize - The number of items to include in the batchPreparedStatement.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readEventDataWithGaps(Connection connection, long globalIndex, int batchSize, List<Long> gaps) throws SQLException
fetchTrackedEvents(TrackingToken, int)connection - The connection to the database.globalIndex - The index taken from the tracking token.batchSize - The number of items to include in the batchgaps - The Set of gaps taken from the tracking token.PreparedStatement.SQLException - when an exception occurs while creating the prepared statement.public void createSchema(EventTableFactory schemaFactory)
schemaFactory - Factory of the event schema.EventStoreException - when an error occurs executing SQL statements.protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer)
AbstractEventStorageEngineevents to the backing database. Use the given serializer to serialize the event's
payload and metadata.appendEvents in class AbstractEventStorageEngineevents - Events to append to the databaseserializer - Serializer used to convert the events to a suitable format for storageprotected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer)
AbstractEventStorageEnginesnapshot of an Aggregate. Implementations may override any existing snapshot of the
Aggregate with the given snapshot.storeSnapshot in class AbstractEventStorageEnginesnapshot - Snapshot Event of the aggregateserializer - Serializer used to convert the snapshot event to a suitable format for storagepublic Optional<Long> lastSequenceNumberFor(@Nonnull String aggregateIdentifier)
EventStorageEngineaggregateIdentifier.
While it's recommended to use the sequence numbers from the DomainEventStream, there are cases where
knowing the sequence number is required, without having read the actual events. In such case, this method is a
viable alternative.
aggregateIdentifier - The identifier to find the last sequence number forpublic TrackingToken createTailToken()
EventStorageEnginenull is returnedpublic TrackingToken createHeadToken()
EventStorageEnginenull is returnedpublic TrackingToken createTokenAt(@Nonnull Instant dateTime)
EventStorageEnginedateTime. If there is an event exactly at the given
dateTime, it will be tracked too.dateTime - The date and time for determining criteria how the tracking token should be created. A tracking
token should point to very first event before this date and time.dateTime, if there aren't events matching this criteria null is returnedprotected List<? extends DomainEventData<?>> fetchDomainEvents(String aggregateIdentifier, long firstSequenceNumber, int batchSize)
BatchingEventStorageEngineaggregateIdentifier.
The sequence numbers in the returned batch should be ordered by sequence number. The first event in the batch
should have a sequence number equal to or larger than given firstSequenceNumber. Implementations should
make sure the returned batch does not contain gaps between events due to uncommitted storage transactions.
If the returned number of entries is smaller than the given batchSize it is assumed that the storage
holds no further applicable entries. Implementations for which this is not always the case should override
BatchingEventStorageEngine.fetchForAggregateUntilEmpty() to return true and preferably configure a better
BatchingEventStorageEngine.Builder.finalAggregateBatchPredicate(Predicate) to provide a better heuristic for detecting the last
batch in a stream.fetchDomainEvents in class BatchingEventStorageEngineaggregateIdentifier - The identifier of the aggregate to open a stream forfirstSequenceNumber - The sequence number of the first excepted event entrybatchSize - The maximum number of events that should be returnedprotected boolean fetchForAggregateUntilEmpty()
BatchingEventStorageEngineBatchingEventStorageEngine.readEventData(String, long) should proceed fetching events for an aggregate until
an empty batch is returned. Defaults to false, as Aggregate event batches typically do not have gaps in
them.fetchForAggregateUntilEmpty in class BatchingEventStorageEngineboolean specifying whether BatchingEventStorageEngine.readEventData(String, long) should proceed fetching events
for an aggregate until an empty batch is returnedprotected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize)
BatchingEventStorageEngineTrackingToken greater
than the given lastToken. Event entries in the stream should be ordered by tracking token. If the lastToken is null a stream containing all events should be returned.
Only if the returned List is empty the event storage assumes that the backing database holds no further applicable entries.
fetchTrackedEvents in class BatchingEventStorageEnginelastToken - Object describing the global index of the last processed event or null to create a
stream of all events in the storebatchSize - The maximum number of events that should be returnedprotected Stream<? extends DomainEventData<?>> readSnapshotData(String aggregateIdentifier)
AbstractEventStorageEngineaggregateIdentifier if the backing database
contains a snapshot of the aggregate.
It is required that specific event storage engines return snapshots in descending order of their sequence number.
readSnapshotData in class AbstractEventStorageEngineaggregateIdentifier - The aggregate identifier to fetch a snapshot forprotected PreparedStatement readEventData(Connection connection, TrackingToken lastToken, int batchSize) throws SQLException
trackingToken
of null to create a statement for all entries in the storage.connection - The connection to the database.lastToken - Object describing the global index of the last processed event or null to return all
entries in the store.batchSize - The number of items to include in the batchPreparedStatement that returns event entries for the given query when executed.SQLException - when an exception occurs while creating the prepared statement.protected TrackedEventData<?> getTrackedEventData(ResultSet resultSet, GapAwareTrackingToken previousToken) throws SQLException
resultSet.resultSet - The results of a query for tracked events.previousToken - The last known token of the tracker before obtaining this result set.SQLException - when an exception occurs while creating the event data.protected DomainEventData<?> getDomainEventData(ResultSet resultSet) throws SQLException
resultSet.resultSet - The results of a query for domain events of an aggregate.SQLException - when an exception occurs while creating the event data.protected DomainEventData<?> getSnapshotData(ResultSet resultSet) throws SQLException
resultSet.resultSet - The results of a query for a snapshot of an aggregate.SQLException - when an exception occurs while creating the event data.protected Object readTimeStamp(ResultSet resultSet, String columnName) throws SQLException
resultSet at given columnIndex. The resultSet is positioned in
the row that contains the data. This method must not change the row in the result set.resultSet - The resultSet containing the stored data.columnName - The name of the column containing the timestamp.SQLException - when an exception occurs reading from the resultSet.protected void writeTimestamp(PreparedStatement preparedStatement, int position, Instant timestamp) throws SQLException
Instant to a data value suitable for the database scheme.preparedStatement - the statement to update.position - the position of the timestamp parameter in the statement.timestamp - Instant to convert.SQLException - if modification of the statement fails.protected <T> T readPayload(ResultSet resultSet, String columnName) throws SQLException
resultSet at given columnIndex. The resultSet is
positioned in the row that contains the data. This method must not change the row in the result set.resultSet - The resultSet containing the stored data.columnName - The name of the column containing the payload.SQLException - when an exception occurs reading from the resultSet.@Deprecated protected String domainEventFields()
EventSchema.domainEventFields()@Deprecated protected String trackedEventFields()
EventSchema.trackedEventFields()protected EventSchema schema()
EventSchema that defines the table and column names of event tables in the database.protected Connection getConnection()
Connection to the database.@Deprecated public void setGapTimeout(int gapTimeout)
gapTimeout(int) in the builder() insteadgapTimeout - The amount of time, in milliseconds until a gap may be considered timed out.@Deprecated public void setGapCleaningThreshold(int gapCleaningThreshold)
gapCleaningThreshold(int) in the builder()
insteadgapCleaningThreshold - The number of gaps before triggering a cleanup.Copyright © 2010–2023. All rights reserved.