Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Writing an an unbounded collection to GCS

I have seen many questions on the same topic. But, I am still having problem with writing to GCS. I am reading the topic from pubsub and trying to push this to GCS. I have referred to this link. But, couldn't find the IOChannelUtils in the latest beam packages.

PCollection<String> details = pipeline

PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
        public String apply(String s) {
            return "constant";

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)

    PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());

This I have taken from many other similar topics in stack overflow. Now, I understand that, TextIO do support unbounded PCollection write option with withWindowedWrites and withNumShards.

ref : Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn

But, I did not understand, how I should do this.

I am trying to write to GCS as follows.

FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
            StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");


I do not have sufficient points to add comments to those topics in Stack Overflow, hence I am raising it as a different question.

like image 524
Balu Avatar asked Jan 30 '23 20:01


2 Answers

I could solve this issue by modifying the Windowing as given below

PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())

 streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));

public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {



Though I could solve it like this, I am still not sure about the windowing concept here. I will add more details as an when I find it. If anyone has more understanding, please add more details. Thanks

like image 170
Balu Avatar answered Feb 01 '23 10:02


Check out this Pub/Sub to GCS Pipeline which provides a full example of writing windowed files to GCS.

like image 27
Ryan McDowell Avatar answered Feb 01 '23 09:02

Ryan McDowell