Let me simplify my case. I'm using Apache Beam 0.6.0. My final processed result is PCollection<KV<String, String>>
. And I want to write values to different files corresponding to their keys.
For example, let's say the result consists of
(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)
Then I want to write value1
, value3
and value4
to key1.txt
, and write value4
to key2.txt
.
And in my case:
Any ideas?
A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.
A PCollection<T> is an immutable collection of values of type T . A PCollection can contain either a bounded or unbounded number of elements.
A Pipeline manages a directed acyclic graph of PTransforms , and the PCollections that the PTransforms consume and produce. Each Pipeline is self-contained and isolated from any other Pipeline . The PValues that are inputs and outputs of each of a Pipeline's PTransforms are also owned by that Pipeline .
Handily, I wrote a sample of this case just the other day.
This example is dataflow 1.x style
Basically you group by each key, and then you can do this with a custom transform that connects to cloud storage. Caveat being that your list of lines per-file shouldn't be massive (it's got to fit into memory on a single instance, but considering you can run high-mem instances, that limit is pretty high).
...
PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
.apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
readyToWrite.apply(
new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
...
And then the transform doing most of the work is:
public class PTransformWriteToGCS
extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {
private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);
private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();
private final String bucketName;
private final SerializableFunction<String, String> pathCreator;
public PTransformWriteToGCS(final String bucketName,
final SerializableFunction<String, String> pathCreator) {
this.bucketName = bucketName;
this.pathCreator = pathCreator;
}
@Override
public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {
return input
.apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {
@Override
public void processElement(
final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
throws Exception {
final String key = arg0.element().getKey();
final List<String> values = arg0.element().getValue();
final String toWrite = values.stream().collect(Collectors.joining("\n"));
final String path = pathCreator.apply(key);
BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
.setContentType(MimeTypes.TEXT)
.build();
LOG.info("blob writing to: {}", blobInfo);
Blob result = STORAGE.create(blobInfo,
toWrite.getBytes(StandardCharsets.UTF_8));
}
}));
}
}
Just write a loop in a ParDo function! More details - I had the same scenario today, the only thing is in my case key=image_label and value=image_tf_record. So like what you have asked, I am trying to create separate TFRecord files, one per class, each record file containing a number of images. HOWEVER not sure if there might be memory issues when a number of values per key are very high like your scenario: (Also my code is in Python)
class WriteToSeparateTFRecordFiles(beam.DoFn):
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
l, image_list = element
writer = tf.python_io.TFRecordWriter(self.outdir + "/tfr" + str(l) + '.tfrecord')
for example in image_list:
writer.write(example.SerializeToString())
writer.close()
And then in your pipeline just after the stage where you get key-value pairs to add these two lines:
(p
| 'GroupByLabelId' >> beam.GroupByKey()
| 'SaveToMultipleFiles' >> beam.ParDo(WriteToSeparateTFRecordFiles(opt, p))
)
you can use FileIO.writeDinamic() for that
PCollection<KV<String,String>> readfile= (something you read..);
readfile.apply(FileIO. <String,KV<String,String >> writeDynamic()
.by(KV::getKey)
.withDestinationCoder(StringUtf8Coder.of())
.via(Contextful.fn(KV::getValue), TextIO.sink())
.to("somefolder")
.withNaming(key -> FileIO.Write.defaultNaming(key, ".txt")));
p.run();
In Apache Beam 2.2 Java SDK, this is natively supported in TextIO
and AvroIO
using respectively TextIO
and AvroIO.write().to(DynamicDestinations)
. See e.g. this method.
Update (2018): Prefer to use FileIO.writeDynamic()
together with TextIO.sink()
and AvroIO.sink()
instead.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With