public abstract class AbstractEventStore extends AbstractEventBus implements EventStore
EventStore
that uses a EventStorageEngine
to store and load events.Modifier | Constructor and Description |
---|---|
protected |
AbstractEventStore(EventStorageEngine storageEngine)
Initializes an event store with given
storageEngine and NoOpMessageMonitor . |
protected |
AbstractEventStore(EventStorageEngine storageEngine,
MessageMonitor<? super EventMessage<?>> messageMonitor)
Initializes an event store with given
storageEngine and messageMonitor . |
Modifier and Type | Method and Description |
---|---|
TrackingToken |
createHeadToken()
Creates the token at the end of an event stream.
|
TrackingToken |
createTailToken()
Creates the token at the beginning of an event stream.
|
TrackingToken |
createTokenAt(Instant dateTime)
Creates a token that tracks all events after given
dateTime . |
protected Optional<DomainEventMessage<?>> |
handleSnapshotReadingError(String aggregateIdentifier,
Throwable e)
Invoked when an error (
Exception or LinkageError ) occurs while attempting to read a snapshot
event. |
Optional<Long> |
lastSequenceNumberFor(String aggregateIdentifier)
Returns the last known sequence number of an Event for the given
aggregateIdentifier . |
protected void |
prepareCommit(List<? extends EventMessage<?>> events)
Process given
events while the Unit of Work root is preparing for commit. |
DomainEventStream |
readEvents(String aggregateIdentifier)
Open an event stream containing all domain events belonging to the given
aggregateIdentifier . |
DomainEventStream |
readEvents(String aggregateIdentifier,
long firstSequenceNumber)
Open an event stream containing all domain events belonging to the given
aggregateIdentifier . |
protected Stream<? extends DomainEventMessage<?>> |
stagedDomainEventMessages(String aggregateIdentifier)
Returns a Stream of all DomainEventMessages that have been staged for publication by an Aggregate with given
aggregateIdentifier . |
protected EventStorageEngine |
storageEngine()
Returns the
EventStorageEngine used by the event store. |
void |
storeSnapshot(DomainEventMessage<?> snapshot)
Stores the given (temporary)
snapshot event. |
afterCommit, commit, intercept, publish, queuedMessages, registerDispatchInterceptor, subscribe
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
openStream, publish, publish, registerDispatchInterceptor
subscribe
createTokenSince
protected AbstractEventStore(EventStorageEngine storageEngine)
storageEngine
and NoOpMessageMonitor
.storageEngine
- The storage engine used to store and load eventsprotected AbstractEventStore(EventStorageEngine storageEngine, MessageMonitor<? super EventMessage<?>> messageMonitor)
storageEngine
and messageMonitor
.storageEngine
- The storage engine used to store and load eventsmessageMonitor
- The monitor used to record event publicationsprotected void prepareCommit(List<? extends EventMessage<?>> events)
AbstractEventBus
events
while the Unit of Work root is preparing for commit. The default implementation
signals the registered MessageMonitor
that the given events are ingested and passes the events to each
registered event processor.prepareCommit
in class AbstractEventBus
events
- Events to be published by this Event Buspublic DomainEventStream readEvents(String aggregateIdentifier)
aggregateIdentifier
.
The returned stream is finite, ending with the last known event of the aggregate. If the event store holds no events of the given aggregate an empty stream is returned.
This implementation returns a DomainEventStream
starting with the last stored snapshot of the aggregate
followed by subsequent domain events.
readEvents
in interface EventStore
aggregateIdentifier
- the identifier of the aggregate whose events to fetchprotected Optional<DomainEventMessage<?>> handleSnapshotReadingError(String aggregateIdentifier, Throwable e)
Exception
or LinkageError
) occurs while attempting to read a snapshot
event. This method can be overridden to change the default behavior, which is to log the exception (warn level)
and ignore the snapshot.
Overriding implementations may choose to return normally, or raise an exception. Exceptions raised from this
method are propagated to the caller of the readEvents(String)
or readEvents(String, long)
methods.
Returning an empty Optional will force the initialization of the aggregate to happen based on the entire event stream of that aggregate.
aggregateIdentifier
- The identifier of the aggregate for which an snapshot failed to loade
- The exception or error that occurred while loading or deserializing the snapshotRuntimeException
- any runtimeException to fail loading theprotected Stream<? extends DomainEventMessage<?>> stagedDomainEventMessages(String aggregateIdentifier)
aggregateIdentifier
.aggregateIdentifier
- The identifier of the aggregate to get staged events forpublic DomainEventStream readEvents(String aggregateIdentifier, long firstSequenceNumber)
EventStore
aggregateIdentifier
.
The returned stream is finite, ending with the last known event of the aggregate. If the event store holds no events of the given aggregate an empty stream is returned.
The default implementation invokes EventStore.readEvents(String)
and then filters out events with a sequence number
smaller than firstSequenceNumber
.
readEvents
in interface EventStore
aggregateIdentifier
- the identifier of the aggregate whose events to fetchfirstSequenceNumber
- the expected sequence number of the first event in the returned streampublic void storeSnapshot(DomainEventMessage<?> snapshot)
EventStore
snapshot
event. This snapshot replaces the segment of the event stream
identified by the snapshot
's Aggregate Identifier
up
to (and including) the event with the snapshot
's sequence
number
.
These snapshots will only affect the DomainEventStream
returned by the EventStore.readEvents(String)
method. They do not change the events returned by EventBus.openStream(TrackingToken)
or those received
by using SubscribableMessageSource.subscribe(java.util.function.Consumer)
.
Note that snapshots are considered a temporary replacement for Events, and are used as performance optimization. Event Store implementations may choose to ignore or delete snapshots.
storeSnapshot
in interface EventStore
snapshot
- The snapshot to replace part of the DomainEventStream.protected EventStorageEngine storageEngine()
EventStorageEngine
used by the event store.public Optional<Long> lastSequenceNumberFor(String aggregateIdentifier)
EventStore
aggregateIdentifier
.
It is preferred to retrieve the last known sequence number from the Domain Event Stream when sourcing an Aggregate from events. However, this method provides an alternative in cases no events have been read. For example when using state storage.
lastSequenceNumberFor
in interface EventStore
aggregateIdentifier
- The identifier of the aggregate to find the highest sequence forpublic TrackingToken createTailToken()
StreamableMessageSource
The default behavior for this method is to return null
, which always represents the tail position of a
stream. However, implementations are encouraged to return an instance that explicitly represents the tail
of the stream.
createTailToken
in interface StreamableMessageSource<TrackedEventMessage<?>>
public TrackingToken createHeadToken()
StreamableMessageSource
createHeadToken
in interface StreamableMessageSource<TrackedEventMessage<?>>
public TrackingToken createTokenAt(Instant dateTime)
StreamableMessageSource
dateTime
. If there is an event exactly at the given
dateTime
, it will be tracked too.createTokenAt
in interface StreamableMessageSource<TrackedEventMessage<?>>
dateTime
- The date and time for determining criteria how the tracking token should be created. A tracking
token should point at very first event before this date and time.dateTime
, if there aren't events matching this criteria null
is returnedCopyright © 2010–2018. All rights reserved.