Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Does Apache Beam support custom file names for its output?

While in a distributed processing environment it is common to use "part" file names such as "part-000", is it possible to write an extension of some sort to rename the individual output file names (such as a per window file name) of Apache Beam?

To do this, one might have to be able to assign a name for a window or infer a file name based on the window's content. I would like to know if such an approach is possible.

As to whether the solution should be streaming or batch, a streaming mode example is preferable

like image 357
Ravindranath Akila Avatar asked Oct 09 '17 03:10

Ravindranath Akila


2 Answers

Yes as suggested by jkff you can achieve this using TextIO.write.to(FilenamePolicy).

Examples are below:

If you want to write output to particular local file you can use:

lines.apply(TextIO.write().to("/path/to/file.txt"));

Below is the simple way to write the output using the prefix, link. This example is for google storage, instead of this you can use local/s3 paths.

public class MinimalWordCountJava8 {

  public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    // In order to run your pipeline, you need to make following runner specific changes:
    //
    // CHANGE 1/3: Select a Beam runner, such as BlockingDataflowRunner
    // or FlinkRunner.
    // CHANGE 2/3: Specify runner-required options.
    // For BlockingDataflowRunner, set project and temp location as follows:
    //   DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    //   dataflowOptions.setRunner(BlockingDataflowRunner.class);
    //   dataflowOptions.setProject("SET_YOUR_PROJECT_ID_HERE");
    //   dataflowOptions.setTempLocation("gs://SET_YOUR_BUCKET_NAME_HERE/AND_TEMP_DIRECTORY");
    // For FlinkRunner, set the runner as follows. See {@code FlinkPipelineOptions}
    // for more details.
    //   options.as(FlinkPipelineOptions.class)
    //      .setRunner(FlinkRunner.class);

    Pipeline p = Pipeline.create(options);

    p.apply(TextIO.read().from("gs://apache-beam-samples/shakespeare/*"))
     .apply(FlatMapElements
         .into(TypeDescriptors.strings())
         .via((String word) -> Arrays.asList(word.split("[^\\p{L}]+"))))
     .apply(Filter.by((String word) -> !word.isEmpty()))
     .apply(Count.<String>perElement())
     .apply(MapElements
         .into(TypeDescriptors.strings())
         .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
     // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));

    p.run().waitUntilFinish();
  }
}

This example code will give you more control on writing the output:

 /**
   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about the data
   * being written. This always includes the shard number and the total number of shards. For
   * windowed writes, it also includes the window and pane index (a sequence number assigned to each
   * trigger firing).
   */
  protected static class PerWindowFiles extends FilenamePolicy {

    private final ResourceId prefix;

    public PerWindowFiles(ResourceId prefix) {
      this.prefix = prefix;
    }

    public String filenamePrefixForWindow(IntervalWindow window) {
      String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
      return String.format(
          "%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
    }

    @Override
    public ResourceId windowedFilename(int shardNumber,
                                       int numShards,
                                       BoundedWindow window,
                                       PaneInfo paneInfo,
                                       OutputFileHints outputFileHints) {
      IntervalWindow intervalWindow = (IntervalWindow) window;
      String filename =
          String.format(
              "%s-%s-of-%s%s",
              filenamePrefixForWindow(intervalWindow),
              shardNumber,
              numShards,
              outputFileHints.getSuggestedFilenameSuffix());
      return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        int shardNumber, int numShards, OutputFileHints outputFileHints) {
      throw new UnsupportedOperationException("Unsupported.");
    }
  }

  @Override
  public PDone expand(PCollection<InputT> teamAndScore) {
    if (windowed) {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(new WriteToText.WriteOneFilePerWindow(filenamePrefix));
    } else {
      teamAndScore
          .apply("ConvertToRow", ParDo.of(new BuildRowFn()))
          .apply(TextIO.write().to(filenamePrefix));
    }
    return PDone.in(teamAndScore.getPipeline());
  }
like image 158
Abhijeet Dhumal Avatar answered Sep 29 '22 11:09

Abhijeet Dhumal


Yes. Per documentation of TextIO:

If you want better control over how filenames are generated than the default policy allows, a custom FilenamePolicy can also be set using TextIO.Write.to(FilenamePolicy)

like image 20
jkff Avatar answered Sep 29 '22 11:09

jkff