Interface StreamingEventProcessor

All Superinterfaces:
DescribableComponent, EventProcessor
All Known Implementing Classes:
PooledStreamingEventProcessor

public interface StreamingEventProcessor extends EventProcessor
An EventProcessor which processes an event stream in segments. Segmenting the stream of events allows for parallelization of event processing, effectively enhancing the processing speed.

A StreamingEventProcessor uses a TokenStore to store the progress of each of the segments it is processing. Furthermore, it allows for segment interactions like:

Since:
4.5
Author:
Allard Buijze, Steven van Beelen
  • Method Details

    • getTokenStoreIdentifier

      String getTokenStoreIdentifier()
      Returns the unique identifier of the TokenStore used by this StreamingEventProcessor.
      Returns:
      the unique identifier of the TokenStore used by this StreamingEventProcessor
      Throws:
      UnableToRetrieveIdentifierException - if the TokenStore was unable to retrieve it
    • releaseSegment

      CompletableFuture<Void> releaseSegment(int segmentId)
      Instructs the processor to release the segment with given segmentId.
      Parameters:
      segmentId - the id of the segment to release
    • releaseSegment

      CompletableFuture<Void> releaseSegment(int segmentId, long releaseDuration, TimeUnit unit)
      Instructs the processor to release the segment with given segmentId. This processor will not try to claim the given segment for the specified releaseDuration in the given unit, to ensure it is not immediately reclaimed. Note that this will override any previous release duration that existed for this segment. Providing a negative value will allow the segment to be immediately claimed.

      If the processor is not actively processing the segment with given segmentId, claiming it will be ignored for the given timeframe nonetheless.

      Parameters:
      segmentId - the id of the segment to be released for the specified releaseDuration
      releaseDuration - the amount of time to disregard segmentId for processing
      unit - the unit of time used to express the releaseDuration
    • claimSegment

      default CompletableFuture<Boolean> claimSegment(int segmentId)
      Instructs the processor to claim the segment with given segmentId and start processing it as soon as possible.

      The given segmentId must not be currently processed by a different processor instance, as that will have an active claim on the token. Claiming a segment that is already being processed will have no effect and return true.

      A true return value indicates that the segment has been claimed and will be processed by this processor. The StreamingEventProcessor may postpone start of work until after completion of this task, as long as the token has been claimed so work can be started. A return value of false indicates that the segment has not been claimed due to the token for that segment not being available.

      Parameters:
      segmentId - the identifier of the segment to claim and start processing
      Returns:
      a CompletableFuture providing the result of the claim operation
    • splitSegment

      CompletableFuture<Boolean> splitSegment(int segmentId)
      Instruct the processor to split the segment with given segmentId into two segments, allowing an additional process to start processing events from it.

      To be able to split segments, the TokenStore configured with this processor must use explicitly initialized tokens. Also, the given segmentId must be currently processed by a process owned by this processor instance.

      Parameters:
      segmentId - the identifier of the segment to split
      Returns:
      a CompletableFuture providing the result of the split operation
    • mergeSegment

      CompletableFuture<Boolean> mergeSegment(int segmentId)
      Instruct the processor to merge the segment with given segmentId back with the segment that it was originally split from. The processor must be able to claim the other segment, in order to merge it. Therefore, this other segment must not have any active claims in the TokenStore.

      The processor must currently be actively processing the segment with given segmentId.

      Use releaseSegment(int) to force this processor to release any claims with tokens required to merge the segments.

      To find out which segment a given segmentId should be merged with, use the following procedure:

           EventTrackerStatus status = processor.processingStatus().get(segmentId);
           if (status == null) {
               // this processor is not processing segmentId, and will not be able to merge
           }
           return status.getSegment().mergeableSegmentId();
       
      Parameters:
      segmentId - the identifier of the segment to merge
      Returns:
      a CompletableFuture indicating whether the merge was executed successfully
    • supportsReset

      boolean supportsReset()
      Indicates whether this StreamingEventProcessor supports a "reset". Generally, a reset is supported if at least one of the Event Handling Components assigned to this processor supports it, and no handlers explicitly prevent the resets.

      This method should be invoked prior to invoking any of the resetTokens() operations as an early validation.

      Returns:
      true if resets are supported, false otherwise
    • resetTokens

      CompletableFuture<Void> resetTokens()
      Resets tokens to their initial state. This effectively causes a replay.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

    • resetTokens

      <R> CompletableFuture<Void> resetTokens(@Nullable R resetContext)
      Resets tokens to their initial state. This effectively causes a replay. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      resetContext - a R used to support the reset operation
    • resetTokens

      CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource,CompletableFuture<TrackingToken>> initialTrackingTokenSupplier)
      Reset tokens to the position as return by the given initialTrackingTokenSupplier. This effectively causes a replay since that position.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Parameters:
      initialTrackingTokenSupplier - a function returning the token representing the position to reset to
    • resetTokens

      <R> CompletableFuture<Void> resetTokens(@Nonnull Function<TrackingTokenSource,CompletableFuture<TrackingToken>> initialTrackingTokenSupplier, @Nullable R resetContext)
      Reset tokens to the position as return by the given initialTrackingTokenSupplier. This effectively causes a replay since that position. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      initialTrackingTokenSupplier - a function returning the token representing the position to reset to
      resetContext - a R used to support the reset operation
    • resetTokens

      default CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition)
      Resets tokens to the given startPosition. This effectively causes a replay of events since that position.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Parameters:
      startPosition - the token representing the position to reset the processor to
    • resetTokens

      <R> CompletableFuture<Void> resetTokens(@Nonnull TrackingToken startPosition, @Nullable R resetContext)
      Resets tokens to the given startPosition. This effectively causes a replay of events since that position. The given resetContext will be used to support the (optional) reset operation in an Event Handling Component.

      Note that the new token must represent a position that is before the current position of the processor.

      Before attempting to reset the tokens, the caller must stop this processor, as well as any instances of the same logical processor that may be running in the cluster. Failure to do so will cause the reset to fail, as a processor can only reset the tokens if it is able to claim them all.

      Type Parameters:
      R - the type of the provided resetContext
      Parameters:
      startPosition - the token representing the position to reset the processor to
      resetContext - a R used to support the reset operation
    • maxCapacity

      int maxCapacity()
      Specifies the maximum amount of segments this EventProcessor can process at the same time.
      Returns:
      the maximum amount of segments this EventProcessor can process at the same time
    • processingStatus

      Map<Integer,EventTrackerStatus> processingStatus()
      Returns the status for each of the segments processed by this processor as EventTrackerStatus instances. The key of the Map represent the segment ids processed by this instance. The values of the returned Map represent the last known status of that segment.

      Note that the returned Map is unmodifiable, but does reflect any changes made to the status as the processor is processing Events.

      Returns:
      the status for each of the segments processed by the current processor
    • isReplaying

      default boolean isReplaying()
      Returns the overall replay status of this StreamingEventProcessor. Any other instances of this streaming processor running on other applications are not not taken into account in this calculation.

      Note that when an EventTrackerStatus returns true for both EventTrackerStatus.isReplaying() and EventTrackerStatus.isCaughtUp(), that the replay is done but the processor did not handle any new events yet.

      Returns:
      true if any of the segments is still replaying and not caught up yet, false otherwise