Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Processing Total Ordering of Events By Key using Apache Beam

Problem Context

I am trying to generate a total (linear) order of event items per key from a real-time stream where the order is event time (derived from the event payload).

Approach

I had attempted to implement this using streaming as follows:

1) Set up a non overlapping sequential windows, e.g. duration 5 minutes

2) Establish an allowed lateness - it is fine to discard late events

3) Set accumulation mode to retain all fired panes

4) Use the "AfterwaterMark" trigger

5) When handling a triggered pane, only consider the pane if it is the final one

6) Use GroupBy.perKey to ensure all events in this window for this key will be processed as a unit on a single resource

While this approach ensures linear order for each key within a given window, it does not make that guarantee across multiple windows, e.g. there could be a window of events for the key which occurs after that is being processed at the same time as the earlier window, this could easily happen if the first window failed and had to be retried.

I'm considering adapting this approach where the realtime stream can first be processed so that it partitions the events by key and writes them to files named by their window range. Due to the parallel nature of beam processing, these files will also be generated out of order. A single process coordinator could then submit these files sequentially to a batch pipeline - only submitting the next one when it has received the previous file and that downstream processing of it has completed successfully.

The problem is that Apache Beam will only fire a pane if there was at least one time element in that time window. Thus if there are gaps in events then there could be gaps in the files that are generated - i.e. missing files. The problem with having missing files is that the coordinating batch processor cannot make the distinction between knowing whether the time window has passed with no data or if there has been a failure in which case it cannot proceed until the file finally arrives.

One way to force the event windows to trigger might be to somehow add dummy events to the stream for each partition and time window. However, this is tricky to do...if there are large gaps in the time sequence then if these dummy events occur surrounded by events much later then they will be discarded as being late.

Are there other approaches to ensuring there is a trigger for every possible event window, even if that results in outputting empty files?

Is generating a total ordering by key from a realtime stream a tractable problem with Apache Beam? Is there another approach I should be considering?

like image 687
mark Avatar asked Aug 25 '17 20:08

mark


People also ask

What is PCollection and PTransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

What is the concept of PCollection in GC dataflow?

PCollection. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is PTransform in Apache Beam?

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

What is Apache Beam used for?

Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Beam SDKs, you build a program that defines the pipeline.


1 Answers

Depending on your definition of tractable, it is certainly possible to totally order a stream per key by event timestamp in Apache Beam.

Here are the considerations behind the design:

  1. Apache Beam does not guarantee in-order transport, so there is no use within a pipeline. So I will assume you are doing this so you can write to an external system with only the capability to handle things if they come in order.
  2. If an event has timestamp t, you can never be certain no earlier event will arrive unless you wait until t is droppable.

So here's how we'll do it:

  1. We'll write a ParDo that uses state and timers (blog post still under review) in the global window. This makes it a per-key workflow.
  2. We'll buffer elements in state when they arrive. So your allowed lateness affects how efficient of a data structure you need. What you need is a heap to peek and pop the minimum timestamp and element; there's no built-in heap state so I'll just write it as a ValueState.
  3. We'll set a event time timer to receive a call back when an element's timestamp can no longer be contradicted.

I'm going to assume a custom EventHeap data structure for brevity. In practice, you'd want to break this up into multiple state cells to minimize the data transfered. A heap might be a reasonable addition to primitive types of state.

I will also assume that all the coders we need are already registered and focus on the state and timers logic.

new DoFn<KV<K, Event>, Void>() {

  @StateId("heap")
  private final StateSpec<ValueState<EventHeap>> heapSpec = StateSpecs.value();

  @TimerId("next")
  private final TimerSpec nextTimerSpec = TimerSpec.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void process(
      ProcessContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = firstNonNull(
      heapState.read(),
      EventHeap.createForKey(ctx.element().getKey()));
    heap.add(ctx.element().getValue());
    // When the watermark reaches this time, no more elements
    // can show up that have earlier timestamps
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }

  @OnTimer("next")
  public void onNextTimestamp(
      OnTimerContext ctx,
      @StateId("heap") ValueState<EventHeap> heapState,
      @TimerId("next") Timer nextTimer) {
    EventHeap heap = heapState.read();
    // If the timer at time t was delivered the watermark must
    // be strictly greater than t
    while (!heap.nextTimestamp().isAfter(ctx.timestamp())) {
      writeToExternalSystem(heap.pop());
    }
    nextTimer.set(heap.nextTimestamp().plus(allowedLateness);
  }
}

This should hopefully get you started on the way towards whatever your underlying use case is.

like image 128
Kenn Knowles Avatar answered Oct 23 '22 14:10

Kenn Knowles