We use protobuf with GCP's pubsub and dataflow. We define both data we send to pubsub as well as bigquery schema with a single proto file.
publisher -(send proto)-> pubsub -> dataflow -(write)-> bigquery
Sometime dataflow does some cosmetic changes but it's mostly copying fields from protobuf to bigquery.
My question is that, is there a way to automatically convert protobuf model to bigquery's TableRow?
Simplified dataflow code we have now is below. I want to eliminate most of code in ProtoToTableRow class :
public class MyPipeline {
public static void main(String[] args) {
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("ConvertToTableRows", ParDo.of(new ProtoToTableRow()))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
}
}
// I want this class to be super thin!
class ProtoToTableRow extends DoFn<Core.MyProtoObject, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
Core.Foo foo = c.element().getFoo();
TableRow fooRow = new TableRow()
.set("id", foo.getId())
.set("bar", foo.getBar())
.set("baz", foo.getBaz());
// similar code repeated for 100s of lines
TableRow row = new TableRow()
.set("foo", foo)
c.output(row);
}
}
You can accomplish this in a very cool way. Beam provides a schema inference methods for various classes including Java Beans, AutoValue classes, and also Protocol Buffers.
For your pipeline, you would not need to convert to TableRow, you could do something like this:
pipeline.getSchemaRegistry().registerSchemaProvider(
Core.MyProtoObject.class, new ProtoMessageSchema());
events = pipeline.apply("ReadEvents",
PubsubIO.readProtos(Core.MyProtoObject.class).fromSubscription(subscription));
events.apply("WriteToBigQuery", BigQueryIO.<Core.MyProtoObject>write()
. useBeamSchema()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withExtendedErrorInfo()
.to(table));
Note the useBeamSchema argument in BigQueryIO.write - this will use the automatic conversion.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With