public class JdbcEventStorageEngine extends BatchingEventStorageEngine
By default the payload of events is stored as a serialized blob of bytes. Other columns are used to store meta-data that allow quick finding of DomainEvents for a specific aggregate in the correct order.
| Constructor and Description | 
|---|
JdbcEventStorageEngine(ConnectionProvider connectionProvider,
                      TransactionManager transactionManager)
Initializes an EventStorageEngine that uses JDBC to store and load events using the default  
EventSchema. | 
JdbcEventStorageEngine(Serializer serializer,
                      EventUpcaster upcasterChain,
                      PersistenceExceptionResolver persistenceExceptionResolver,
                      ConnectionProvider connectionProvider,
                      TransactionManager transactionManager)
Initializes an EventStorageEngine that uses JDBC to store and load events using the default  
EventSchema. | 
JdbcEventStorageEngine(Serializer serializer,
                      EventUpcaster upcasterChain,
                      PersistenceExceptionResolver persistenceExceptionResolver,
                      Serializer eventSerializer,
                      ConnectionProvider connectionProvider,
                      TransactionManager transactionManager)
Initializes an EventStorageEngine that uses JDBC to store and load events using the default  
EventSchema. | 
JdbcEventStorageEngine(Serializer serializer,
                      EventUpcaster upcasterChain,
                      PersistenceExceptionResolver persistenceExceptionResolver,
                      Serializer eventSerializer,
                      Integer batchSize,
                      ConnectionProvider connectionProvider,
                      TransactionManager transactionManager,
                      Class<?> dataType,
                      EventSchema schema,
                      Integer maxGapOffset,
                      Long lowestGlobalSequence)
Initializes an EventStorageEngine that uses JDBC to store and load events. 
 | 
| Modifier and Type | Method and Description | 
|---|---|
protected void | 
appendEvents(List<? extends EventMessage<?>> events,
            Serializer serializer)
Append given  
events to the backing database. | 
protected PreparedStatement | 
appendSnapshot(Connection connection,
              DomainEventMessage<?> snapshot,
              Serializer serializer)
Creates a statement to append the given  
snapshot to the event storage using given connection to
 the database. | 
void | 
createSchema(EventTableFactory schemaFactory)
Performs the DDL queries to create the schema necessary for this storage engine implementation. 
 | 
protected PreparedStatement | 
deleteSnapshots(Connection connection,
               String aggregateIdentifier,
               long sequenceNumber)
Creates a statement to delete all snapshots of the aggregate with given  
aggregateIdentifier. | 
protected String | 
domainEventFields()
Returns a comma separated list of domain event column names to select from an event or snapshot entry. 
 | 
protected List<? extends DomainEventData<?>> | 
fetchDomainEvents(String aggregateIdentifier,
                 long firstSequenceNumber,
                 int batchSize)
Returns a batch of events published by an aggregate with given  
aggregateIdentifier. | 
protected List<? extends TrackedEventData<?>> | 
fetchTrackedEvents(TrackingToken lastToken,
                  int batchSize)
Returns a batch of serialized event data entries in the event storage that have a  
TrackingToken greater
 than the given lastToken. | 
protected Connection | 
getConnection()
Returns a  
Connection to the database. | 
protected DomainEventData<?> | 
getDomainEventData(ResultSet resultSet)
Extracts the next domain event entry from the given  
resultSet. | 
protected DomainEventData<?> | 
getSnapshotData(ResultSet resultSet)
Extracts the next snapshot entry from the given  
resultSet. | 
protected TrackedEventData<?> | 
getTrackedEventData(ResultSet resultSet,
                   GapAwareTrackingToken previousToken)
Extracts the next tracked event entry from the given  
resultSet. | 
Optional<Long> | 
lastSequenceNumberFor(String aggregateIdentifier)
Returns the last known sequence number for the given  
aggregateIdentifier. | 
protected PreparedStatement | 
readEventData(Connection connection,
             String identifier,
             long firstSequenceNumber,
             int batchSize)
Creates a statement to read domain event entries for an aggregate with given identifier starting with the first
 entry having a sequence number that is equal or larger than the given  
firstSequenceNumber. | 
protected PreparedStatement | 
readEventData(Connection connection,
             TrackingToken lastToken,
             int batchSize)
Creates a statement to read tracked event entries stored since given tracking token. 
 | 
protected <T> T | 
readPayload(ResultSet resultSet,
           String columnName)
Reads a serialized object from the given  
resultSet at given columnIndex. | 
protected PreparedStatement | 
readSnapshotData(Connection connection,
                String identifier)
Creates a statement to read the snapshot entry of an aggregate with given identifier. 
 | 
protected Optional<? extends DomainEventData<?>> | 
readSnapshotData(String aggregateIdentifier)
Returns an optional serialized event entry for given  
aggregateIdentifier if the backing database
 contains a snapshot of the aggregate. | 
protected Object | 
readTimeStamp(ResultSet resultSet,
             String columnName)
Reads a timestamp from the given  
resultSet at given columnIndex. | 
protected EventSchema | 
schema()
Returns the  
EventSchema that defines the table and column names of event tables in the database. | 
void | 
setGapCleaningThreshold(int gapCleaningThreshold)
Sets the threshold of number of gaps in a token before an attempt to clean gaps up is taken. 
 | 
void | 
setGapTimeout(int gapTimeout)
Sets the amount of time until a 'gap' in a TrackingToken may be considered timed out. 
 | 
protected void | 
storeSnapshot(DomainEventMessage<?> snapshot,
             Serializer serializer)
Store the given  
snapshot of an Aggregate. | 
protected String | 
trackedEventFields()
Returns a comma separated list of tracked domain event column names to select from an event entry. 
 | 
protected void | 
writeTimestamp(PreparedStatement preparedStatement,
              int position,
              Instant timestamp)
Write a timestamp from a  
Instant to a data value suitable for the database scheme. | 
batchSize, readEventData, readEventDataappendEvents, getEventSerializer, getSerializer, handlePersistenceException, readEvents, readEvents, readSnapshot, storeSnapshotclone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, waitappendEvents, readEventspublic JdbcEventStorageEngine(ConnectionProvider connectionProvider, TransactionManager transactionManager)
EventSchema.
 The payload and metadata of events is stored as a serialized blob of bytes using a new XStreamSerializer.
 Events are read in batches of 100. No upcasting is performed after the events have been fetched.
connectionProvider - The provider of connections to the underlying database.transactionManager - The instance managing transactions around fetching event data. Required by certain
                           databases for reading blob data.public JdbcEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain, PersistenceExceptionResolver persistenceExceptionResolver, ConnectionProvider connectionProvider, TransactionManager transactionManager)
EventSchema.
 The payload and metadata of events is stored as a serialized blob of bytes using the given serializer.
 
 Events are read in batches of 100. The given upcasterChain is used to upcast events before
 deserialization. The same Serializer is used for both snapshots and
 events.
serializer - Used to serialize and deserialize event payload and metadata, and snapshots.upcasterChain - Allows older revisions of serialized objects to be deserialized.persistenceExceptionResolver - Detects concurrency exceptions from the backing database. If null
                                     persistence exceptions are not explicitly resolved.connectionProvider - The provider of connections to the underlying database.transactionManager - The instance managing transactions around fetching event data. Required by
                                     certain databases for reading blob data.public JdbcEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain, PersistenceExceptionResolver persistenceExceptionResolver, Serializer eventSerializer, ConnectionProvider connectionProvider, TransactionManager transactionManager)
EventSchema.
 The payload and metadata of events is stored as a serialized blob of bytes using the given serializer.
 
 Events are read in batches of 100. The given upcasterChain is used to upcast events before
 deserialization.
serializer - Used to serialize and deserialize snapshots.upcasterChain - Allows older revisions of serialized objects to be deserialized.persistenceExceptionResolver - Detects concurrency exceptions from the backing database. If null
                                     persistence exceptions are not explicitly resolved.eventSerializer - Used to serialize and deserialize event payload and metadata.connectionProvider - The provider of connections to the underlying database.transactionManager - The instance managing transactions around fetching event data. Required by
                                     certain databases for reading blob data.public JdbcEventStorageEngine(Serializer serializer, EventUpcaster upcasterChain, PersistenceExceptionResolver persistenceExceptionResolver, Serializer eventSerializer, Integer batchSize, ConnectionProvider connectionProvider, TransactionManager transactionManager, Class<?> dataType, EventSchema schema, Integer maxGapOffset, Long lowestGlobalSequence)
serializer - Used to serialize and deserialize snapshots.upcasterChain - Allows older revisions of serialized objects to be deserialized.persistenceExceptionResolver - Detects concurrency exceptions from the backing database.eventSerializer - Used to serialize and deserialize event payload and metadata.batchSize - The number of events that should be read at each database access. When more
                                     than this number of events must be read to rebuild an aggregate's state, the
                                     events are read in batches of this size. Tip: if you use a snapshotter, make
                                     sure to choose snapshot trigger and batch size such that a single batch will
                                     generally retrieve all events required to rebuild an aggregate's state.connectionProvider - The provider of connections to the underlying database.transactionManager - The instance managing transactions around fetching event data. Required by
                                     certain databases for reading blob data.dataType - The data type for serialized event payload and metadata.schema - Object that describes the database schema of event entries.maxGapOffset - The maximum distance in sequence numbers between a missing event and the
                                     event with the highest known index. If the gap is bigger it is assumed that
                                     the missing event will not be committed to the store anymore. This event
                                     storage engine will no longer look for those events the next time a batch is
                                     fetched.lowestGlobalSequence - The first expected auto generated sequence number. For most data stores this
                                     is 1 unless the table has contained entries before.public void createSchema(EventTableFactory schemaFactory)
schemaFactory - Factory of the event schema.EventStoreException - when an error occurs executing SQL statements.protected void appendEvents(List<? extends EventMessage<?>> events, Serializer serializer)
AbstractEventStorageEngineevents to the backing database. Use the given serializer to serialize the event's
 payload and metadata.appendEvents in class AbstractEventStorageEngineevents - Events to append to the databaseserializer - Serializer used to convert the events to a suitable format for storageprotected void storeSnapshot(DomainEventMessage<?> snapshot, Serializer serializer)
AbstractEventStorageEnginesnapshot of an Aggregate. Implementations may override any existing snapshot of the
 Aggregate with the given snapshot.storeSnapshot in class AbstractEventStorageEnginesnapshot - Snapshot Event of the aggregateserializer - Serializer used to convert the snapshot event to a suitable format for storagepublic Optional<Long> lastSequenceNumberFor(String aggregateIdentifier)
EventStorageEngineaggregateIdentifier.
 
 While it's recommended to use the sequence numbers from the DomainEventStream, there are cases where
 knowing the sequence number is required, without having read the actual events. In such case, this method is a
 viable alternative.
aggregateIdentifier - The identifier to find the last sequence number forprotected PreparedStatement appendSnapshot(Connection connection, DomainEventMessage<?> snapshot, Serializer serializer) throws SQLException
snapshot to the event storage using given connection to
 the database. Use the given serializer to serialize the payload and metadata of the event.connection - The connection to the database.snapshot - The snapshot to append.serializer - The serializer that should be used when serializing the event's payload and metadata.PreparedStatement that appends the snapshot when executed.SQLException - when an exception occurs while creating the prepared statementprotected PreparedStatement deleteSnapshots(Connection connection, String aggregateIdentifier, long sequenceNumber) throws SQLException
aggregateIdentifier.connection - The connection to the database.aggregateIdentifier - The identifier of the aggregate whose snapshots to delete.PreparedStatement that deletes all the aggregate's snapshots when executed.SQLException - when an exception occurs while creating the prepared statement.protected List<? extends DomainEventData<?>> fetchDomainEvents(String aggregateIdentifier, long firstSequenceNumber, int batchSize)
BatchingEventStorageEngineaggregateIdentifier.
 
 The sequence numbers in the returned batch should be ordered by sequence number. The first event in the batch
 should have a sequence number equal to or larger than given firstSequenceNumber. Implementations should
 make sure the returned batch does not contain gaps between events due to uncommitted storage transactions.
 
 If the returned number of entries is smaller than the given batchSize it is assumed that the storage
 holds no further applicable entries.fetchDomainEvents in class BatchingEventStorageEngineaggregateIdentifier - The identifier of the aggregate to open a stream forfirstSequenceNumber - The sequence number of the first excepted event entrybatchSize - The maximum number of events that should be returnedprotected List<? extends TrackedEventData<?>> fetchTrackedEvents(TrackingToken lastToken, int batchSize)
BatchingEventStorageEngineTrackingToken greater
 than the given lastToken. Event entries in the stream should be ordered by tracking token. If the lastToken is null a stream containing all events should be returned.
 Only if the returned List is empty the event storage assumes that the backing database holds no further applicable entries.
fetchTrackedEvents in class BatchingEventStorageEnginelastToken - Object describing the global index of the last processed event or null to create a
                  stream of all events in the storebatchSize - The maximum number of events that should be returnedprotected Optional<? extends DomainEventData<?>> readSnapshotData(String aggregateIdentifier)
AbstractEventStorageEngineaggregateIdentifier if the backing database
 contains a snapshot of the aggregate.readSnapshotData in class AbstractEventStorageEngineaggregateIdentifier - The aggregate identifier to fetch a snapshot forprotected PreparedStatement readEventData(Connection connection, String identifier, long firstSequenceNumber, int batchSize) throws SQLException
firstSequenceNumber.connection - The connection to the database.identifier - The identifier of the aggregate.firstSequenceNumber - The expected sequence number of the first returned entry.PreparedStatement that returns event entries for the given query when executed.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readEventData(Connection connection, TrackingToken lastToken, int batchSize) throws SQLException
trackingToken
 of null to create a statement for all entries in the storage.connection - The connection to the database.lastToken - Object describing the global index of the last processed event or null to return all
                   entries in the store.PreparedStatement that returns event entries for the given query when executed.SQLException - when an exception occurs while creating the prepared statement.protected PreparedStatement readSnapshotData(Connection connection, String identifier) throws SQLException
connection - The connection to the database.identifier - The aggregate identifier.PreparedStatement that returns the last snapshot entry of the aggregate (if any) when executed.SQLException - when an exception occurs while creating the prepared statement.protected TrackedEventData<?> getTrackedEventData(ResultSet resultSet, GapAwareTrackingToken previousToken) throws SQLException
resultSet.resultSet - The results of a query for tracked events.previousToken - The last known token of the tracker before obtaining this result set.SQLException - when an exception occurs while creating the event data.protected DomainEventData<?> getDomainEventData(ResultSet resultSet) throws SQLException
resultSet.resultSet - The results of a query for domain events of an aggregate.SQLException - when an exception occurs while creating the event data.protected DomainEventData<?> getSnapshotData(ResultSet resultSet) throws SQLException
resultSet.resultSet - The results of a query for a snapshot of an aggregate.SQLException - when an exception occurs while creating the event data.protected Object readTimeStamp(ResultSet resultSet, String columnName) throws SQLException
resultSet at given columnIndex. The resultSet is
 positioned in the row that contains the data. This method must not change the row in the result set.resultSet - The resultSet containing the stored data.columnName - The name of the column containing the timestamp.SQLException - when an exception occurs reading from the resultSet.protected void writeTimestamp(PreparedStatement preparedStatement, int position, Instant timestamp) throws SQLException
Instant to a data value suitable for the database scheme.preparedStatement - the statement to update.position - the position of the timestamp parameter in the statement.timestamp - Instant to convert.SQLException - if modification of the statement fails.protected <T> T readPayload(ResultSet resultSet, String columnName) throws SQLException
resultSet at given columnIndex. The resultSet
 is positioned in the row that contains the data. This method must not change the row in the result set.resultSet - The resultSet containing the stored data.columnName - The name of the column containing the payload.SQLException - when an exception occurs reading from the resultSet.protected String domainEventFields()
protected String trackedEventFields()
protected EventSchema schema()
EventSchema that defines the table and column names of event tables in the database.protected Connection getConnection()
Connection to the database.public void setGapTimeout(int gapTimeout)
gapTimeout - The amount of time, in milliseconds until a gap may be considered timed out.public void setGapCleaningThreshold(int gapCleaningThreshold)
gapCleaningThreshold - The number of gaps before triggering a cleanup.Copyright © 2010–2018. All rights reserved.