Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Reading from PubsubIO writing to DatastoreIO

Is it possible to create a pipeline that reads data from Pub/Sub and writes to Datastore? In my code I specify the PubsubIO as the input, and apply windowing to get a bounded PCollection, but it seems that it is not possible to use the DatastoreIO.writeTo with the options.setStreaming as true, while that is required in order to use PubsubIO as input. Is there a way around this? Or is it simply not possible to read from pubsub and write to datastore?

Here's my code:

DataflowPipelineOptions options = PipelineOptionsFactory.create()
            .as(DataflowPipelineOptions.class);

    options.setRunner(DataflowPipelineRunner.class);
    options.setProject(projectName);
    options.setStagingLocation("gs://my-staging-bucket/staging");
    options.setStreaming(true);

    Pipeline p = Pipeline.create(options);

    PCollection<String> input = p.apply(PubsubIO.Read.topic("projects/"+projectName+"/topics/event-streaming"));
    PCollection<String> inputWindow = input.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(5))).triggering(AfterPane.elementCountAtLeast(1)).discardingFiredPanes().withAllowedLateness(Duration.standardHours(1)));
    PCollection<String> inputDecode = inputWindow.apply(ParDo.of(new DoFn<String, String>() {
        private static final long serialVersionUID = 1L;
        public void processElement(ProcessContext c) {
            String msg = c.element();
            byte[] decoded = Base64.decodeBase64(msg.getBytes());
            String outmsg = new String(decoded);
            c.output(outmsg);
        }
    }));
    PCollection<DatastoreV1.Entity> inputEntity = inputDecode.apply(ParDo.of(new CreateEntityFn("stream", "events")));

    inputEntity.apply(DatastoreIO.writeTo(datasetid));


    p.run();

And this is the exception I get:

Exception in thread "main" java.lang.UnsupportedOperationException: The Write transform is not supported by the Dataflow streaming runner.
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:488)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$StreamingWrite.apply(DataflowPipelineRunner.java:480)
at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:74)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:314)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:312)
at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:358)
at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:267)
at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:159)
at my.own.project.google.dataflow.EventStreamingDataflow.main(EventStreamingDataflow.java:104)
like image 328
lilline Avatar asked Jan 14 '16 14:01

lilline


1 Answers

The DatastoreIO sink is not currently supported in the streaming runner. To write to Datastore from a streaming pipeline, you can make direct calls to the Datastore API from a DoFn.

like image 108
danielm Avatar answered Jan 02 '23 07:01

danielm