While in a distributed processing environment it is common to use "part" file names such as "part-000", is it possible to write an extension of some sort to rename the individual output file names (such as a per window file name) of Apache Beam?
To do this, one might have to be able to assign a name for a window or infer a file name based on the window's content. I would like to know if such an approach is possible.
As to whether the solution should be streaming or batch, a streaming mode example is preferable
Yes as suggested by jkff you can achieve this using TextIO.write.to(FilenamePolicy).
Examples are below:
If you want to write output to particular local file you can use:
lines.apply(TextIO.write().to("/path/to/file.txt"));
Below is the simple way to write the output using the prefix, link. This example is for google storage, instead of this you can use local/s3 paths.
public class MinimalWordCountJava8 {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// In order to run your pipeline, you need to make following runner specific changes:
//
// CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
// or FlinkRunner.
// CHANGE 2/3: Specify runner-required options.
// For BlockingDataflowRunner, set project and temp location as follows:
// DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
// dataflowOptions.setRunner(BlockingDataflowRunner.class);
// dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
// dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
// For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
// for more details.
// options.as(FlinkPipelineOptions.class)
// .setRunner(FlinkRunner.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
.apply(FlatMapElements
.into(TypeDescriptors.strings())
.via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
.apply(Filter.by((String word) -> !word.isEmpty()))
.apply(Count.<String>perElement())
.apply(MapElements
.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
// CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
.apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
p.run().waitUntilFinish();
}
}
This example code will give you more control on writing the output:
/**
* A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
* being written. This always includes the shard number and the total number of shards. For
* windowed writes, it also includes the window and pane index (a sequence number assigned to each
* trigger firing).
*/
protected static class PerWindowFiles extends FilenamePolicy {
private final ResourceId prefix;
public PerWindowFiles(ResourceId prefix) {
this.prefix = prefix;
}
public String filenamePrefixForWindow(IntervalWindow window) {
String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
return String.format(
"%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
}
@Override
public ResourceId windowedFilename(int shardNumber,
int numShards,
BoundedWindow window,
PaneInfo paneInfo,
OutputFileHints outputFileHints) {
IntervalWindow intervalWindow = (IntervalWindow) window;
String filename =
String.format(
"%s-%s-of-%s%s",
filenamePrefixForWindow(intervalWindow),
shardNumber,
numShards,
outputFileHints.getSuggestedFilenameSuffix());
return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
}
@Override
public ResourceId unwindowedFilename(
int shardNumber, int numShards, OutputFileHints outputFileHints) {
throw new UnsupportedOperationException("Unsupported.");
}
}
@Override
public PDone expand(PCollection<InputT> teamAndScore) {
if (windowed) {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
} else {
teamAndScore
.apply("ConvertToRow", ParDo.of(new BuildRowFn()))
.apply(TextIO.write().to(filenamePrefix));
}
return PDone.in(teamAndScore.getPipeline());
}
Yes. Per documentation of TextIO:
If you want better control over how filenames are generated than the default policy allows, a custom FilenamePolicy can also be set using TextIO.Write.to(FilenamePolicy)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With