Hello i am very confused By the dynamic file destinations api and there is no docs so here i am.
The situation is i have a PCollection and it contains events belonging to different partitions. I want to split them up and write them to different folders in gcs.
Here is what i have.
Dynamic destination object:
class GCSDestinationString(prefix: String) extends DynamicDestinations[Event, String, String] {
override def getDestination(element: Event): String = {
element.partition //this returns a string which is a gcs folder path
}
override def getFilenamePolicy(destination: String): FileBasedSink.FilenamePolicy = {
println(destination)
val overallPrefix = s"$prefix/$destination/part-"
DefaultFilenamePolicy.fromStandardParameters(
ValueProvider.StaticValueProvider.of(
FileSystems.matchNewResource(overallPrefix, false)),
null, ".jsonl", true)
}
override def formatRecord(record: Event): String = {
implicit val f = DefaultFormats
write(record.toDataLakeFormat())
}
override def getDefaultDestination: String = "default"
}
I believe this is the correct logic, i ask each element what its destination partition is and then that get passed into the getFileNamePolicy and from there a file name is built. To format the record i just convert it to json.
The issue is integrating this with TextIO, i tried this
TextIO.
write()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
but it requires that the source type be string, technically this could work but i would have to deserialise multiple times. I found in the docs for text io dynamic destinations
Often this is used in conjunction with {@link TextIO#writeCustomType}, which allows your {@link DynamicDestinations} object to examine the input type and takes a format function to convert that type to a string for writing.
So lets try that
TextIO
.writeCustomType[Event]()
.withWindowedWrites()
.withTempDirectory(tempDir)
.to(new GCSDestinationString("gcs://bucket"))
This still doesn't compile as writeCustomType internally returns TypedWrite<UserT, Void>
and that has the knock on affect of requiring the 2nd type parameter of my dynamic destination object to be Void. Clearly i require it to be a string or at least something other than Void
Im clearly missing something
Oh man, this is embarrassing. Turns out writeCustomType().to(DynamicDestinations)
was not tested and we didn't notice it, but it had a typo in the type signature. PR https://github.com/apache/beam/pull/4319 is in review. You'll still need 2.3.0-SNAPSHOT to pick it up though, in which case I would still recommend to just use FileIO.write()
.
It doesn't seem to compile in scala but i was able to get the behaviour i wanted with a similar api after digging around
var outputTransform =
TextIO.
writeCustomType[T]()
.withFormatFunction(outputFormatter)
.withNumShards(shards)
.withTempDirectory(tempDir)
.withCompression(compression)
if (windowedWrites) {
outputTransform = outputTransform.withWindowedWrites()
}
outputTransform.to(outputFileNamePolicyMapping, emptryDestination)
where output formatter is from T to string and outputFileNamePolicyMapping is from T to DefaultFilenamePolicy.Params
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With