public class MongoTokenStore extends Object implements TokenStore
Constructor and Description |
---|
MongoTokenStore(MongoTemplate mongoTemplate,
Serializer serializer)
Creates a MongoTokenStore with a default claim timeout of 10 seconds, a default owner identifier and a default
content type.
|
MongoTokenStore(MongoTemplate mongoTemplate,
Serializer serializer,
TemporalAmount claimTimeout,
String nodeId,
Class<?> contentType)
Creates a MongoTokenStore by using the given values.
|
Modifier and Type | Method and Description |
---|---|
void |
ensureIndexes()
Creates the indexes required to work with the TokenStore.
|
void |
extendClaim(String processorName,
int segment)
Extends the claim on the current token held by the this node for the given
processorName and
segment . |
int[] |
fetchSegments(String processorName)
Returns an array of known
segments for a given processorName . |
TrackingToken |
fetchToken(String processorName,
int segment)
|
void |
initializeTokenSegments(String processorName,
int segmentCount)
Initializes the given
segmentCount number of segments for the given processorName to track its
tokens. |
void |
releaseClaim(String processorName,
int segment)
Release a claim of the token for given
processorName and segment . |
void |
storeToken(TrackingToken token,
String processorName,
int segment)
Stores the given
token in the store. |
public MongoTokenStore(MongoTemplate mongoTemplate, Serializer serializer)
mongoTemplate
- used to access the collection in which tracking tokens are storedserializer
- serializer used to serialize tracking tokenspublic MongoTokenStore(MongoTemplate mongoTemplate, Serializer serializer, TemporalAmount claimTimeout, String nodeId, Class<?> contentType)
mongoTemplate
- used to access the collection in which tracking tokens are storedserializer
- serializer used to serialize TrackingTokenclaimTimeout
- the amount of time after which a claim is automatically releasednodeId
- the owner identifier that this token store usescontentType
- the data type of the serialized tracking tokenpublic void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException
TokenStore
token
in the store. The token marks the current position of the process with given
processorName
and segment
. The given token
may be null
.
Any claims made by the current process have their timestamp updated.storeToken
in interface TokenStore
token
- The token to store for a given process and segment. May be null
.processorName
- The name of the process for which to store the tokensegment
- The index of the segment for which to store the tokenUnableToClaimTokenException
- when the token being updated has been claimed by another process.public void initializeTokenSegments(String processorName, int segmentCount) throws UnableToClaimTokenException
TokenStore
segmentCount
number of segments for the given processorName
to track its
tokens. This method should only be invoked when no tokens have been stored for the given processor, yet.
This method will initialize the tokens, but no claim them. It will create the segments ranging from 0
until segmentCount - 1
.
The exact behavior when this method is called while tokens were already present, is undefined in case the token already present is not owned by the initializing process.
initializeTokenSegments
in interface TokenStore
processorName
- The name of the processor to initialize segments forsegmentCount
- The number of segments to initializeUnableToClaimTokenException
- when a segment has already been createdpublic TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException
token
for the given processorName
and segment
.
Returns null
if the store holds no token or if the stored token for the given process and segment is
null
.
The token will be claimed by the current process (JVM instance), preventing access by other instances. To release
the claim, use TokenStore.releaseClaim(String, int)
fetchToken
in interface TokenStore
processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokennull
if the store holds no token for given process and segmentUnableToClaimTokenException
- if there is a token for given processorName
and segment
, but
they are claimed by another process.public void extendClaim(String processorName, int segment) throws UnableToClaimTokenException
TokenStore
processorName
and
segment
.extendClaim
in interface TokenStore
processorName
- The process name for which to fetch the tokensegment
- The segment index for which to fetch the tokenUnableToClaimTokenException
- if there is no token for given processorName
and segment
, or
if it has been claimed by another process.public void releaseClaim(String processorName, int segment)
processorName
and segment
. If no such claim existed,
nothing happens.
The caller must ensure not to use any streams opened based on the token for which the claim is released.
releaseClaim
in interface TokenStore
processorName
- The name of the process owning the token (e.g. a TrackingEventProcessor name)segment
- the segment for which a token was obtainedpublic int[] fetchSegments(String processorName)
TokenStore
segments
for a given processorName
.
The segments returned are segments for which a token has been stored previously. When the TokenStore
is
empty, an empty array is returned.
fetchSegments
in interface TokenStore
processorName
- The process name for which to fetch the segments@PostConstruct public void ensureIndexes()
Copyright © 2010–2017. All rights reserved.