I'm having trouble creating a Map PCollectionView with the DataflowRunner.
The pipeline below aggregates an unbouded countingInput together with values from a side-input (containing 10 generated values). When running the pipeline on gcp it get's stuck inside the View.asMap() transform. More specifially, the ParDo(StreamingPCollectionViewWriter) does not have any output.
I tried this with dataflow 2.0.0-beta3, as well as with beam-0.7.0-SNAPSHOT, without any result. Note that my pipeline is running without any problem when using the local DirectRunner.
Am I doing something wrong? All help is appreciated, thanks in advance for helping me out!
public class SimpleSideInputPipeline {
private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);
public interface Options extends DataflowPipelineOptions {}
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
final PCollectionView<Map<Integer, String>> sideInput = pipeline
.apply(CountingInput.forSubrange(0L, 10L))
.apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of(c.element().intValue(), "TEST"));
}
}))
.apply(View.asMap());
pipeline
.apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
.apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<Integer, String> map = c.sideInput(sideInput);
//get first segment from map
Object[] values = map.values().toArray();
String firstVal = (String) values[0];
LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
c.output(KV.of(c.element(), firstVal));
}
}).withSideInputs(sideInput));
pipeline.run();
}
}
No need to worry that the ParDo(StreamingPCollectionViewWriterFn)
does not record any output - what it does is actually write each element to an internal location.
You code looks OK to me, and this should be investigated. I have filed BEAM-2155.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With