Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I write to multiple files in Apache Beam?

Let me simplify my case. I'm using Apache Beam 0.6.0. My final processed result is PCollection<KV<String, String>>. And I want to write values to different files corresponding to their keys.

For example, let's say the result consists of

(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)

Then I want to write value1, value3 and value4 to key1.txt, and write value4 to key2.txt.

And in my case:

  • Key set is determined when the pipeline is running, not when constructing the pipeline.
  • Key set may be quite small, but the number of values corresponding to each key may be very very large.

Any ideas?

like image 229
abcdabcd987 Avatar asked Apr 08 '17 06:04

abcdabcd987


People also ask

What is PTransform in Apache Beam?

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

What is PCollection in Apache Beam?

A PCollection<T> is an immutable collection of values of type T . A PCollection can contain either a bounded or unbounded number of elements.

What is pipeline in Apache Beam?

A Pipeline manages a directed acyclic graph of PTransforms , and the PCollections that the PTransforms consume and produce. Each Pipeline is self-contained and isolated from any other Pipeline . The PValues that are inputs and outputs of each of a Pipeline's PTransforms are also owned by that Pipeline .


Video Answer


4 Answers

Handily, I wrote a sample of this case just the other day.

This example is dataflow 1.x style

Basically you group by each key, and then you can do this with a custom transform that connects to cloud storage. Caveat being that your list of lines per-file shouldn't be massive (it's got to fit into memory on a single instance, but considering you can run high-mem instances, that limit is pretty high).

    ...
    PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
                .apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
        readyToWrite.apply(
                new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
    ...

And then the transform doing most of the work is:

public class PTransformWriteToGCS
    extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {

    private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);

    private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();

    private final String bucketName;

    private final SerializableFunction<String, String> pathCreator;

    public PTransformWriteToGCS(final String bucketName,
        final SerializableFunction<String, String> pathCreator) {
        this.bucketName = bucketName;
        this.pathCreator = pathCreator;
    }

    @Override
    public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {

        return input
            .apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {

                @Override
                public void processElement(
                    final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
                    throws Exception {
                    final String key = arg0.element().getKey();
                    final List<String> values = arg0.element().getValue();
                    final String toWrite = values.stream().collect(Collectors.joining("\n"));
                    final String path = pathCreator.apply(key);
                    BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
                        .setContentType(MimeTypes.TEXT)
                        .build();
                    LOG.info("blob writing to: {}", blobInfo);
                    Blob result = STORAGE.create(blobInfo,
                        toWrite.getBytes(StandardCharsets.UTF_8));
                }
            }));
    }
}
like image 72
CasualT Avatar answered Oct 09 '22 23:10

CasualT


Just write a loop in a ParDo function! More details - I had the same scenario today, the only thing is in my case key=image_label and value=image_tf_record. So like what you have asked, I am trying to create separate TFRecord files, one per class, each record file containing a number of images. HOWEVER not sure if there might be memory issues when a number of values per key are very high like your scenario: (Also my code is in Python)

class WriteToSeparateTFRecordFiles(beam.DoFn):

def __init__(self, outdir):
    self.outdir = outdir

def process(self, element):
    l, image_list = element
    writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
    for example in image_list:
        writer.write(example.SerializeToString())
    writer.close()

And then in your pipeline just after the stage where you get key-value pairs to add these two lines:

   (p
    | 'GroupByLabelId' >> beam.GroupByKey()
    | 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
    )
like image 38
whywhywhy Avatar answered Oct 09 '22 22:10

whywhywhy


you can use FileIO.writeDinamic() for that

PCollection<KV<String,String>> readfile= (something you read..);

readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to("somefolder")
    .withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));

p.run();
like image 4
Michael Julian Avatar answered Oct 09 '22 22:10

Michael Julian


In Apache Beam 2.2 Java SDK, this is natively supported in TextIO and AvroIO using respectively TextIO and AvroIO.write().to(DynamicDestinations). See e.g. this method.

Update (2018): Prefer to use FileIO.writeDynamic() together with TextIO.sink() and AvroIO.sink() instead.

like image 2
jkff Avatar answered Oct 10 '22 00:10

jkff