Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use dataflow text io dynamic destinations in java

Hello i am very confused By the dynamic file destinations api and there is no docs so here i am.

The situation is i have a PCollection and it contains events belonging to different partitions. I want to split them up and write them to different folders in gcs.

Here is what i have.

Dynamic destination object:

  class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {

    override def getDestination(element: Event): String = {
      element.partition //this returns a string which is a gcs folder path
    }

    override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
      println(destination)
      val overallPrefix = s"$prefix/$destination/part-"
      DefaultFilenamePolicy.fromStandardParameters(
        ValueProvider.StaticValueProvider.of(
          FileSystems.matchNewResource(overallPrefix, false)),
        null, ".jsonl", true)
    }

    override def formatRecord(record: Event): String = {
      implicit val f = DefaultFormats
      write(record.toDataLakeFormat())
    }

    override def getDefaultDestination: String = "default"
  }

I believe this is the correct logic, i ask each element what its destination partition is and then that get passed into the getFileNamePolicy and from there a file name is built. To format the record i just convert it to json.

The issue is integrating this with TextIO, i tried this

TextIO.
  write()
  .withWindowedWrites()
  .withTempDirectory(tempDir)
  .to(new GCSDestinationString("gcs://bucket"))

but it requires that the source type be string, technically this could work but i would have to deserialise multiple times. I found in the docs for text io dynamic destinations

Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.

So lets try that

  TextIO
    .writeCustomType[Event]()
    .withWindowedWrites()
    .withTempDirectory(tempDir)
    .to(new GCSDestinationString("gcs://bucket"))

This still doesn't compile as writeCustomType internally returns TypedWrite<UserT, Void> and that has the knock on affect of requiring the 2nd type parameter of my dynamic destination object to be Void. Clearly i require it to be a string or at least something other than Void

Im clearly missing something

like image 310
Luke De Feo Avatar asked Dec 21 '17 13:12

Luke De Feo


2 Answers

Oh man, this is embarrassing. Turns out writeCustomType().to(DynamicDestinations) was not tested and we didn't notice it, but it had a typo in the type signature. PR https://github.com/apache/beam/pull/4319 is in review. You'll still need 2.3.0-SNAPSHOT to pick it up though, in which case I would still recommend to just use FileIO.write().

like image 108
jkff Avatar answered Sep 21 '22 10:09

jkff


It doesn't seem to compile in scala but i was able to get the behaviour i wanted with a similar api after digging around

 var outputTransform =
        TextIO.
          writeCustomType[T]()
          .withFormatFunction(outputFormatter)
          .withNumShards(shards)
          .withTempDirectory(tempDir)
          .withCompression(compression)

      if (windowedWrites) {
        outputTransform = outputTransform.withWindowedWrites()
      }

      outputTransform.to(outputFileNamePolicyMapping, emptryDestination)

where output formatter is from T to string and outputFileNamePolicyMapping is from T to DefaultFilenamePolicy.Params

like image 21
Luke De Feo Avatar answered Sep 20 '22 10:09

Luke De Feo