So I've read both beam's stateful processing and timely processing articles and had found issues implementing the functions per se.
The problem I am trying to solve is something similar to this to generate a sequential index for every line. Since I do wish to be able to reference the line produced by dataflow to that of the original source.
public static class createIndex extends DoFn<String, KV<String, String>> {
@StateId("count")
private final StateSpec<ValueState<Long>> countState = StateSpecs.value(VarLongCoder.of());
@ProcessElement
public void processElement(ProcessContext c, @StateId("count") ValueState<Long> countState) {
String val = c.element();
long count = 0L;
if(countState.read() != null)
count = countState.read();
count = count + 1;
countState.write(count);
c.output(KV.of(String.valueOf(count), val));
}
}
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new createIndex()));
I followed whatever I could find online and looked at the raw source code of ParDo and was not sure what needed to be done. The Error I am getting is:
java.lang.IllegalArgumentException: ParDo requires its input to use KvCoder in order to use state and timers.
I looked up examples here and here.
I realize this is a simple problem but due to lack of sufficient examples or documentation, I was unable to fix the problem. I'd appreciate any help. Thanks!
Ok, so I continued working on the problem and after reading some source, I was able to resolve the issue. It turns out that the input for the ParDo.of(new DoFn())
requires the input coming in to be of the form KV<T,U>
.
Therefore in order to read the file and create an index for each line, I need to pass it through a Key Value Pair object. Below I added the code:
public static class FakeKvPair extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
And changed the pipeline to:
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://randomBucket/file.txt"))
.apply(ParDo.of(new FakeKvPair()))
.apply(ParDo.of(new createIndex()));
The new problem that this raises is whether the order of the lines are preserved since I am running an extra ParDo function (that might potentially change up the order of the lines) prior to being fed to createIndex()
.
On my local machine the order is preserved, but am not sure how that scales to Dataflow. But I will ask that as a different question.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With