I'm using dataflow to generate a large amount of data.
I've tested two versions of my pipeline: one with a side input (of varying sizes), and other other without.
When I run the pipeline without the side input, my job will finish in about 7 minutes. When I run my job with the side input, my job will never finish.
Here's what my DoFn looks like:
public class MyDoFn extends DoFn<String, String> {
    final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pCollectionView;
    final List<CSVRecord> stuff;
    private Aggregator<Integer, Integer> dofnCounter =
            createAggregator("DoFn Counter", new Sum.SumIntegerFn());
    public MyDoFn(PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv, List<CSVRecord> m) {
        this.pCollectionView = pcv;
        this.stuff = m;
    }
    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<TreeMap<Long, Float>>> pdata = processContext.sideInput(pCollectionView);
        processContext.output(AnotherClass.generateData(stuff, pdata));
        dofnCounter.addValue(1);
    }
}
And here's what my pipeline looks like:
final Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
PCollection<KV<String, TreeMap<Long, Float>>> data;
data = p.apply(TextIO.Read.from("gs://where_the_files_are/*").named("Reading Data"))
        .apply(ParDo.named("Parsing data").of(new DoFn<String, KV<String, TreeMap<Long, Float>>>() {
            @Override
            public void processElement(ProcessContext processContext) throws Exception {
                // Parse some data
                processContext.output(KV.of(key, value));
            }
        }));
final PCollectionView<Map<String, Iterable<TreeMap<Long, Float>>>> pcv =
        data.apply(GroupByKey.<String, TreeMap<Long, Float>>create())
                .apply(View.<String, Iterable<TreeMap<Long, Float>>>asMap());
DoFn<String, String> dofn = new MyDoFn(pcv, localList);
p.apply(TextIO.Read.from("gs://some_text.txt").named("Sizing"))
        .apply(ParDo.named("Generating the Data").withSideInputs(pvc).of(dofn))
        .apply(TextIO.Write.named("Write_out").to(outputFile));
p.run();
We've spent about two days trying various methods of getting this to work. We've narrowed it down to the inclusion of the side input. If the processContext is modified to not use the side input, it will still be very slow as long as it's included. If we don't call .withSideInput() it's very fast again.
Just to clarify, we've tested this on sideinput sizes from 20mb - 1.5gb.
Very grateful for any insight.
Edit Including a few job ID's:
2016-01-20_14_31_12-1354600113427960103
2016-01-21_08_04_33-1642110636871153093 (latest)
Please try out the Dataflow SDK 1.5.0+, they should have addressed the known performance problems of your issue.
Side inputs in the Dataflow SDK 1.5.0+ use a new distributed format when running batch pipelines. Note that streaming pipelines and pipelines using older versions of the Dataflow SDK are still subject to re-reading the side input if the view can not be cached entirely in memory.
With the new format, we use an index to provide a block based lookup and caching strategy. Thus when looking into a list by index or looking into a map by key, only the block that contains said index or key will be loaded. Having a cache size which is greater than the working set size will aid in performance as frequently accessed indices/keys will not require re-reading the block they are contained in.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With