public abstract class AbstractMongoEventStorageStrategy extends Object implements StorageStrategy
StorageStrategy
. Implementations only need to provide methods to convert
events and snapshots into Documents and vice versa.Modifier and Type | Field and Description |
---|---|
protected static int |
ORDER_ASC
The value to pass to Mongo to fetch documents in ascending order.
|
protected static int |
ORDER_DESC
The value to pass to Mongo to fetch documents in descending order.
|
Constructor and Description |
---|
AbstractMongoEventStorageStrategy(EventEntryConfiguration eventConfiguration,
Duration lookBackTime)
Initializes a new StorageStrategy for a EventStorageEngine that uses Mongo.
|
Modifier and Type | Method and Description |
---|---|
void |
appendEvents(com.mongodb.client.MongoCollection<org.bson.Document> eventCollection,
List<? extends EventMessage<?>> events,
Serializer serializer)
Appends the given list of
events to the given eventCollection . |
void |
appendSnapshot(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection,
DomainEventMessage<?> snapshot,
Serializer serializer)
Append the given aggregate
snapshot to the snapshotCollection . |
protected abstract Stream<org.bson.Document> |
createEventDocuments(List<? extends EventMessage<?>> events,
Serializer serializer)
Returns a stream of Mongo documents that represent the given batch of events.
|
protected abstract org.bson.Document |
createSnapshotDocument(DomainEventMessage<?> snapshot,
Serializer serializer)
Returns a Mongo document for given snapshot event.
|
void |
deleteSnapshots(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection,
String aggregateIdentifier,
long sequenceNumber)
Delete all snapshots of the aggregate with given
aggregateIdentifier from the given snapshotCollection , whose sequence number is lower than the given sequenceNumber . |
void |
ensureIndexes(com.mongodb.client.MongoCollection<org.bson.Document> eventsCollection,
com.mongodb.client.MongoCollection<org.bson.Document> snapshotsCollection)
Ensure that the correct indexes are in place.
|
protected EventEntryConfiguration |
eventConfiguration()
Returns the
EventEntryConfiguration that configures how event entries are to be stored. |
protected abstract Stream<? extends DomainEventData<?>> |
extractEvents(org.bson.Document object)
Retrieves event data from the given Mongo
object . |
protected Long |
extractHighestSequenceNumber(org.bson.Document document)
Extract the highest sequence number known from the entry represented by the given
document . |
protected abstract DomainEventData<?> |
extractSnapshot(org.bson.Document object)
Retrieves snapshot event data from the given Mongo
object . |
List<? extends DomainEventData<?>> |
findDomainEvents(com.mongodb.client.MongoCollection<org.bson.Document> collection,
String aggregateIdentifier,
long firstSequenceNumber,
int batchSize)
Returns a batch of events for an aggregate with given
aggregateIdentifier and a sequence number equal or
higher than the given firstSequenceNumber . |
Optional<? extends DomainEventData<?>> |
findLastSnapshot(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection,
String aggregateIdentifier)
Finds the entry containing the last snapshot event for an aggregate with given
aggregateIdentifier
in the given collection . |
List<? extends TrackedEventData<?>> |
findTrackedEvents(com.mongodb.client.MongoCollection<org.bson.Document> eventCollection,
TrackingToken lastToken,
int batchSize)
Returns a batch of tracked events with a tracking token above the given
lastToken . |
Optional<Long> |
lastSequenceNumberFor(com.mongodb.client.MongoCollection<org.bson.Document> eventsCollection,
String aggregateIdentifier)
Return the last known sequence number for an Aggregate with given
aggregateIdentifier , whose Events are
stored in the given eventsCollection . |
protected static final int ORDER_ASC
protected static final int ORDER_DESC
public AbstractMongoEventStorageStrategy(EventEntryConfiguration eventConfiguration, Duration lookBackTime)
eventConfiguration
- configuration of the event entry 'schema'. If null
the schema with default
values is used.lookBackTime
- the maximum time to look back when fetching new events while tracking. If null
a 10 second interval is used.public void appendEvents(com.mongodb.client.MongoCollection<org.bson.Document> eventCollection, List<? extends EventMessage<?>> events, Serializer serializer)
StorageStrategy
events
to the given eventCollection
. Use the given serializer
to serialize the payload and metadata of the events.appendEvents
in interface StorageStrategy
eventCollection
- the document collection that contains event entriesevents
- the event messages to append to the event collectionserializer
- the serializer used to serialize the event payload and metadataprotected abstract Stream<org.bson.Document> createEventDocuments(List<? extends EventMessage<?>> events, Serializer serializer)
events
represents events produced in the context of a single unit of work. Uses the given serializer
to
serialize event payload and metadata.events
- the events to convert to Mongo documentsserializer
- the serializer to convert the events' payload and metadatapublic void appendSnapshot(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection, DomainEventMessage<?> snapshot, Serializer serializer)
StorageStrategy
snapshot
to the snapshotCollection
. Use the given serializer
to serialize the payload and metadata of the snapshot.appendSnapshot
in interface StorageStrategy
snapshotCollection
- the document collection that contains aggregate snapshot entriessnapshot
- the snapshot event message to append to the event collectionserializer
- the serializer used to serialize the payload and metadata of the snapshot eventprotected abstract org.bson.Document createSnapshotDocument(DomainEventMessage<?> snapshot, Serializer serializer)
serializer
to serialize event payload
and metadata.snapshot
- the snapshot to convertserializer
- the to convert the snapshot's payload and metadatapublic void deleteSnapshots(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection, String aggregateIdentifier, long sequenceNumber)
StorageStrategy
aggregateIdentifier
from the given snapshotCollection
, whose sequence number is lower than the given sequenceNumber
.deleteSnapshots
in interface StorageStrategy
snapshotCollection
- the document collection that contains aggregate snapshot entriesaggregateIdentifier
- the identifier of the aggregate for which to delete all snapshotssequenceNumber
- The sequenceNumber representing the upper bound (exclusive) of the snapshots to deletepublic List<? extends DomainEventData<?>> findDomainEvents(com.mongodb.client.MongoCollection<org.bson.Document> collection, String aggregateIdentifier, long firstSequenceNumber, int batchSize)
StorageStrategy
aggregateIdentifier
and a sequence number equal or
higher than the given firstSequenceNumber
. The returned documents should be ordered chronologically
(typically by using the sequence number). The size of the returned list should not exceed the given batchSize
.findDomainEvents
in interface StorageStrategy
collection
- The collection in which to find the eventsaggregateIdentifier
- The identifier of the aggregate to queryfirstSequenceNumber
- The sequence number of the first event to returnbatchSize
- The maximum number of event entries to fetchprotected abstract Stream<? extends DomainEventData<?>> extractEvents(org.bson.Document object)
object
.object
- the object to convert to event datapublic List<? extends TrackedEventData<?>> findTrackedEvents(com.mongodb.client.MongoCollection<org.bson.Document> eventCollection, TrackingToken lastToken, int batchSize)
StorageStrategy
lastToken
. If lastToken
is null
the first (oldest) batch of entries in the store should be returned.
The returned documents should be ordered chronologically (typically by using the timestamp of the event). The
size of the returned list should not exceed the given batchSize
.
findTrackedEvents
in interface StorageStrategy
eventCollection
- The collection in which to find the eventslastToken
- the token of the last event in the previous batch or null
to load the oldest batchbatchSize
- The maximum number of event entries to fetchpublic Optional<? extends DomainEventData<?>> findLastSnapshot(com.mongodb.client.MongoCollection<org.bson.Document> snapshotCollection, String aggregateIdentifier)
StorageStrategy
aggregateIdentifier
in the given collection
.findLastSnapshot
in interface StorageStrategy
snapshotCollection
- The collection to find the last snapshot event inaggregateIdentifier
- The identifier of the aggregate to find a snapshot forpublic Optional<Long> lastSequenceNumberFor(com.mongodb.client.MongoCollection<org.bson.Document> eventsCollection, String aggregateIdentifier)
StorageStrategy
aggregateIdentifier
, whose Events are
stored in the given eventsCollection
.lastSequenceNumberFor
in interface StorageStrategy
eventsCollection
- The Collection in which to search for EventsaggregateIdentifier
- The aggregate to find the last sequence number forprotected abstract DomainEventData<?> extractSnapshot(org.bson.Document object)
object
.object
- the object to convert to snapshot dataprotected Long extractHighestSequenceNumber(org.bson.Document document)
document
.
This implementation takes the sequenceNumberProperty
defined in the eventConfiguration
.
Implementations that allow storage of multiple events in a single document should override this method.document
- The document representing the entry stored in Mongopublic void ensureIndexes(com.mongodb.client.MongoCollection<org.bson.Document> eventsCollection, com.mongodb.client.MongoCollection<org.bson.Document> snapshotsCollection)
StorageStrategy
ensureIndexes
in interface StorageStrategy
eventsCollection
- The collection containing the documents representing commits and events.snapshotsCollection
- The collection containing the document representing snapshotsprotected EventEntryConfiguration eventConfiguration()
EventEntryConfiguration
that configures how event entries are to be stored.Copyright © 2010–2018. All rights reserved.