Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dataflow Map side-input issue

I'm having trouble creating a Map PCollectionView with the DataflowRunner.

The pipeline below aggregates an unbouded countingInput together with values from a side-input (containing 10 generated values). When running the pipeline on gcp it get's stuck inside the View.asMap() transform. More specifially, the ParDo(StreamingPCollectionViewWriter) does not have any output.

I tried this with dataflow 2.0.0-beta3, as well as with beam-0.7.0-SNAPSHOT, without any result. Note that my pipeline is running without any problem when using the local DirectRunner.

Am I doing something wrong? All help is appreciated, thanks in advance for helping me out!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                    }
                }))
                .apply(View.asMap());

        pipeline
            .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
            .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<Integer, String> map = c.sideInput(sideInput);

                    //get first segment from map
                    Object[] values = map.values().toArray();
                    String firstVal = (String) values[0];
                    LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
                    c.output(KV.of(c.element(), firstVal));
                }
            }).withSideInputs(sideInput));

        pipeline.run();
    }
}
like image 509
Wout Avatar asked Nov 08 '22 22:11

Wout


1 Answers

No need to worry that the ParDo(StreamingPCollectionViewWriterFn) does not record any output - what it does is actually write each element to an internal location.

You code looks OK to me, and this should be investigated. I have filed BEAM-2155.

like image 64
Kenn Knowles Avatar answered Dec 20 '22 19:12

Kenn Knowles