How to access the elements of a side input if I have my class extend DoFn?
For example:
Say I have a ParDo transform like:
PCollection<String> data = myData.apply("Get data",
ParDo.of(new MyClass()).withSideInputs(myDataView));
And I have a class:-
static class MyClass extends DoFn<String,String>
{
//How to access side input here
}
c.sideInput() isn't working in this case.
Thanks.
In this case, the problem is that the processElement
method in your DoFn does not have access to the PCollectionView instance in your main method.
You can pass the PCollectionView to the DoFn in the constructor:
class MyClass extends DoFn<String,String>
{
private final PCollectionView<..> mySideInput;
public MyClass(PCollectionView<..> mySideInput) {
// List, or Map or anything:
this.mySideInput = mySideInput;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException
{
// List or Map or any type you need:
List<..> sideInputList = c.sideInput(mySideInput);
}
}
You would then pass the side input to the class when you instantiate it, and indicate it as a side input like so:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));
The explanation for this is that when you use an anonymous DoFn, the process method has a closure with access to all the objects within the scope that encloses the DoFn (among them is the PCollectionView). When you're not using an anonymous DoFn, there is no closure, and you need another way of passing the PCollectionView.
So although the answer above is correct, it is still a little incomplete.
So once you finish implementing the above answer, you need to execute your pipeline like this:
p.apply(ParDo.of(new MyClass(mySideInput)).withSideInputs(mySideInput));
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With