Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Buffer and flush Apache Beam streaming data

I have a streaming job that with initial run will have to process large amount of data. One of DoFn calls remote service that supports batch requests, so when working with bounded collections I use following approach:

  private static final class Function extends DoFn<String, Void> implements Serializable {
    private static final long serialVersionUID = 2417984990958377700L;

    private static final int LIMIT = 500;

    private transient Queue<String> buffered;

    @StartBundle
    public void startBundle(Context context) throws Exception {
      buffered = new LinkedList<>();
    }

    @ProcessElement
    public void processElement(ProcessContext context) throws Exception {
      buffered.add(context.element());

      if (buffered.size() > LIMIT) {
        flush();
      }
    }

    @FinishBundle
    public void finishBundle(Context c) throws Exception {
      // process remaining
      flush();
    }

    private void flush() {
      // build batch request
      while (!buffered.isEmpty()) {
        buffered.poll();
        // do something
      }
    }
  }

Is there a way to window data so the same approach can be used on unbounded collections?

I've tried following:

pipeline
    .apply("Read", Read.from(source))
    .apply(WithTimestamps.of(input -> Instant.now()))
    .apply(Window.into(FixedWindows.of(Duration.standardMinutes(2L))))
    .apply("Process", ParDo.of(new Function()));

but startBundle and finishBundle are called for every element. Is there a chance to have something like with RxJava (2 minute windows or 100 element bundles):

source
    .toFlowable(BackpressureStrategy.LATEST)
    .buffer(2, TimeUnit.MINUTES, 100) 
like image 202
robosoul Avatar asked Mar 20 '17 21:03

robosoul


1 Answers

This is a quintessential use case for the new feature of per-key-and-windows state and timers.

State is described in a Beam blog post, while for timers you'll have to rely on the Javadoc. Nevermind what the javadoc says about runners supporting them, the true status is found in Beam's capability matrix.

The pattern is very much like what you have written, but state allows it to work with windows and also across bundles, since they may be very small in streaming. Since state must be partitioned somehow to maintain parallelism, you'll need to add some sort of key. Currently there is no automatic sharding for this.

private static final class Function extends DoFn<KV<Key, String>, Void> implements Serializable {
  private static final long serialVersionUID = 2417984990958377700L;

  private static final int LIMIT = 500;

  @StateId("bufferedSize")
  private final StateSpec<Object, ValueState<Integer>> bufferedSizeSpec =
      StateSpecs.value(VarIntCoder.of());

  @StateId("buffered")
  private final StateSpec<Object, BagState<String>> bufferedSpec =
      StateSpecs.bag(StringUtf8Coder.of());

  @TimerId("expiry")
  private final TimerSpec expirySpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

  @ProcessElement
  public void processElement(
      ProcessContext context,
      BoundedWindow window,
      @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
      @StateId("buffered") BagState<String> bufferedState,
      @TimerId("expiry") Timer expiryTimer) {

    int size = firstNonNull(bufferedSizeState.read(), 0);
    bufferedState.add(context.element().getValue());
    size += 1;
    bufferedSizeState.write(size);
    expiryTimer.set(window.maxTimestamp().plus(allowedLateness));

    if (size > LIMIT) {
      flush(context, bufferedState, bufferedSizeState);
    }
  }

  @OnTimer("expiry")
  public void onExpiry(
      OnTimerContext context,
      @StateId("bufferedSize") ValueState<Integer> bufferedSizeState,
      @StateId("buffered") BagState<String> bufferedState) {
    flush(context, bufferedState, bufferedSizeState);
  }

  private void flush(
      WindowedContext context,
      BagState<String> bufferedState,
      ValueState<Integer> bufferedSizeState) {
    Iterable<String> buffered = bufferedState.read();

    // build batch request from buffered
    ...

    // clear things
    bufferedState.clear();
    bufferedSizeState.clear();
  }
}

Taking a few notes here:

  • State replaces your DoFn's instance variables, since instance variables have no cohesion across windows.
  • The buffer and the size are just initialized as needed instead of @StartBundle.
  • The BagState supports "blind" writes, so there doesn't need to be any read-modify-write, just committing the new elements in the same way as when you output.
  • Setting a timer repeatedly for the same time is just fine; it should mostly be a noop.
  • @OnTimer("expiry") takes the place of @FinishBundle, since finishing a bundle is not a per-window thing but an artifact of how a runner executes your pipeline.

All that said, if you are writing to an external system, perhaps you would want to reify the windows and re-window into the global window before just doing writes where the manner of your write depends on the window, since "the external world is globally windowed".

like image 176
Kenn Knowles Avatar answered Nov 20 '22 07:11

Kenn Knowles