Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Insert PubSub messages into BigQuery through Google Cloud Dataflow

I would like to insert PubSub messages data coming from a topic into a BigQuery table using Google Cloud Dataflow. Everything works great but in the BigQuery table I can see unreadable strings like " ߈���". This is my pipeline:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))

and my simple StringToRowConverter function is:

class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) {
    for (String word : c.element().split(",")) {
      if (!word.isEmpty()) {
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      }
    }
}
}

And this is the message I sent through a POST request:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
 "messages": [
  {
   "attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
   },
   "data": "34gf5ert"
  }
 ]
}

What am I missing? Thank you!

like image 812
Leonardo Biagioli Avatar asked Sep 17 '15 14:09

Leonardo Biagioli


People also ask

Can Pubsub write to BigQuery?

Pub/Sub service account permissions To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.

Can cloud dataflow send data to BigQuery?

We are now making support for the Storage Write API in Dataflow available by providing two additional methods to the BigQueryIO connector. You have a choice of using a method with exactly-once semantics of inserting data into BigQuery or a lower latency and potentially cheaper method with at-least-once semantics.

How do you pull messages from Pubsub?

Pull the messages from the subscriptionIn the Google Cloud console, go to the Pub/Sub subscriptions page. In the Messages tab, click Pull .


1 Answers

According to https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage , the JSON payload of the pubsub message is base64 encoded. PubsubIO in Dataflow, by default, uses the String UTF8 coder. The example string you provided "34gf5ert", when base64-decoded and then interpreted as an UTF-8 string, gives exactly "߈���".

like image 118
jkff Avatar answered Nov 05 '22 13:11

jkff