Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing to text files in Apache Beam / Dataflow Python streaming

I have a very basic Python Dataflow job that reads some data from Pub/Sub, applies a FixedWindow and writes to Google Cloud Storage.

transformed = ...
transformed | beam.io.WriteToText(known_args.output)

The output is written to the location specific in --output, but only the temporary stage, i.e.

gs://MY_BUCKET/MY_DIR/beam-temp-2a5c0e1eec1c11e8b98342010a800004/...some_UUID...

The file never gets placed into the correctly named location with the sharding template.

Tested on local and DataFlow runner.


When testing further, I have noticed that the streaming_wordcount example has the same issues, however the standard wordcount example writes fine. Perhaps the issues is to with windowing, or reading from pubsub?


It appears WriteToText is not compatible with the streaming source of PubSub. There are likely workarounds, or the Java version may be compatible, but I have opted to use a different solution altogether.

like image 482
Daniel Messias Avatar asked Nov 19 '18 17:11

Daniel Messias


1 Answers

The WriteToText transform in the Python SDK does not support streaming.

Instead, you may consider the transforms in apache_beam.io.fileio. In this case, you can write something like this (let's suppose 10-minute windows):

my_pcollection = (p | ReadFromPubSub(....)
                    |  WindowInto(FixedWindows(10*60))
                    |  fileio.WriteToFiles(path=known_args.output))

This is enough to write out separate files for each window, and continue to do it as the stream advances.

You'd see files like this (let's suppose output is gs://mybucket/). The files would be printed as the windows get triggered:

gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0000-00002
gs://mybucket/output-1970-01-01T00_00_00-1970-01-01T00_10_00-0001-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0000-00002
gs://mybucket/output-1970-01-01T00_10_00-1970-01-01T00_20_00-0001-00002
...

The files, by default have $prefix-$start-$end-$pane-$shard-of-$numShards$suffix$compressionSuffix names - where prefix is output by default, but you can pass a more complex function for file naming.


If you want to customize how the files get written (e.g. naming of the files, or format of the data, or anything like that), you can look at the extra arguments in WriteToFiles.

You can see an example here of the transform being used in a Beam test, with more complex arguments - but it sounds like the default behavior should be enough for you.

like image 73
Pablo Avatar answered Dec 31 '22 18:12

Pablo