Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Issues with Stateful processing in Apache Beam

So I've read both beam's stateful processing and timely processing articles and had found issues implementing the functions per se.

The problem I am trying to solve is something similar to this to generate a sequential index for every line. Since I do wish to be able to reference the line produced by dataflow to that of the original source.

public static class createIndex extends DoFn<String, KV<String, String>> {
  @StateId("count")
  private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());

  @ProcessElement
  public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState)  {

    String val = c.element();
    long count = 0L;
    if(countState.read() != null)
      count = countState.read();

    count = count + 1;
    countState.write(count);

    c.output(KV.of(String.valueOf(count), val));

  }
}

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new createIndex()));

I followed whatever I could find online and looked at the raw source code of ParDo and was not sure what needed to be done. The Error I am getting is:

java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.

I looked up examples here and here.

I realize this is a simple problem but due to lack of sufficient examples or documentation, I was unable to fix the problem. I'd appreciate any help. Thanks!

like image 674
Haris Nadeem Avatar asked Apr 25 '18 21:04

Haris Nadeem


1 Answers

Ok, so I continued working on the problem and after reading some source, I was able to resolve the issue. It turns out that the input for the ParDo.of(new DoFn()) requires the input coming in to be of the form KV<T,U>.

Therefore in order to read the file and create an index for each line, I need to pass it through a Key Value Pair object. Below I added the code:

public static class FakeKvPair extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

And changed the pipeline to:

Pipeline p = Pipeline.create(options);

p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
 .apply(ParDo.of(new FakeKvPair()))
 .apply(ParDo.of(new createIndex()));

The new problem that this raises is whether the order of the lines are preserved since I am running an extra ParDo function (that might potentially change up the order of the lines) prior to being fed to createIndex().

On my local machine the order is preserved, but am not sure how that scales to Dataflow. But I will ask that as a different question.

like image 169
Haris Nadeem Avatar answered Oct 17 '22 03:10

Haris Nadeem