I would like to insert PubSub messages data coming from a topic into a BigQuery table using Google Cloud Dataflow. Everything works great but in the BigQuery table I can see unreadable strings like " ߈���". This is my pipeline:
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
and my simple StringToRowConverter function is:
class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split(",")) {
if (!word.isEmpty()) {
System.out.println(word);
c.output(new TableRow().set("data", word));
}
}
}
}
And this is the message I sent through a POST request:
POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
"messages": [
{
"attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
},
"data": "34gf5ert"
}
]
}
What am I missing? Thank you!
Pub/Sub service account permissions To create a BigQuery subscription, the Pub/Sub service account must have permission to write to the specific BigQuery table and to read the table metadata.
We are now making support for the Storage Write API in Dataflow available by providing two additional methods to the BigQueryIO connector. You have a choice of using a method with exactly-once semantics of inserting data into BigQuery or a lower latency and potentially cheaper method with at-least-once semantics.
Pull the messages from the subscriptionIn the Google Cloud console, go to the Pub/Sub subscriptions page. In the Messages tab, click Pull .
According to https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage , the JSON payload of the pubsub message is base64 encoded. PubsubIO in Dataflow, by default, uses the String UTF8 coder. The example string you provided "34gf5ert", when base64-decoded and then interpreted as an UTF-8 string, gives exactly "߈���".
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With