Context
I'm working with a streaming pipeline which has a protobuf data source in pubsub. I wish to parse this protobuf into a python dict because the data sink requires the input to be a collection of dicts. I had developed a Protobuf Parser successfully by initializing the protobuf message in process
function of DoFn.
However, I wanted to know, is it possible to make a generic ProtobufParser DoFn on Beam? A generic DoFn is useful from the engineering perspective to avoid re-implementing existing functions and enabling code reuse. In Java, I know that we're able to use generics hence implementing this generic ProtobufParser in Java is relatively easy. Since Python functions are first-class objects, I was thinking whether it's possible to pass a Protobuf schema class (not the message instance object) into a DoFn. I tried to do this, however I kept failing.
Below is my current successful protobuf parser. The protobuf message is initialized inside process
function.
class ParsePubSubProtoToDict(beam.DoFn):
def process(self, element, *args, **kwargs):
from datapipes.protos.data_pb2 import DataSchema
from google.protobuf.json_format import MessageToDict
message = DataSchema()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
While it's good that the above Protobuf DoFn parser works, it isn't generalized to all protobuf schemas, hence this will result in needing to re-implement a new DoFn parser for a different protobuf schema.
To make the parser generalizable for all protobuf schemas, I tried to pass the protobuf schema —which is generated as a class in Python— to DoFn.
class ParsePubSubProtoToDict(beam.DoFn):
def __init__(self, proto_class):
self.proto_class = proto_class
def process(self, element, *args, **kwargs):
from google.protobuf.json_format import MessageToDict
message = self.proto_class()
message.ParseFromString(element)
obj = MessageToDict(message, preserving_proto_field_name=True)
yield obj
def run_pubsub_to_gbq_pipeline(argv):
...
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
and other similar techniques, however, all my attempts fail with the same error message:
pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema
From this error message, I had two hypotheses on why the problem occurs:
Protobuf schema class is unserializable. However, this hypothesis probably wrong because while I'm aware pickle
can't serialize the protobuf schema, if I used dill
, I was able to serialize the protobuf schema. But aside from this, I'm still a bit unsure about how DoFn in python beam implements serialization (eg: when it uses dill
or pickle
to serialize things, what's the serialized format of the object to make it Serializable and compatible with DoFn, etc.)
Import error in DoFn class. I had encountered several Import Error problems with python beam due to function/class scope and dataflow workers, to solve this problem, I had to import the package locally in the function where it's needed, not globally in the module. So maybe, if we pass the protobuf schema class to DoFn, the schema import is actually done outside of DoFn, hence DoFn can't resolve the class name inside DoFn correctly?
My questions would be:
__init__
) is serializable? Is there a Serializable class on beam in which I could inherit so that I could transform my unserializable objects to serializable?Thanks a lot! Your help would be greatly appreciated.
I actually found an alternative solution for creating a generic Protobuf Parser with beam.Map
def convert_proto_to_dict(data, schema_class):
message = schema_class()
if isinstance(data, (str, bytes)):
message.ParseFromString(data)
else:
message = data
return MessageToDict(message, preserving_proto_field_name=True)
def run_pubsub_to_gbq_pipeline(argv):
... options initialization
from datapipes.protos import data_pb2
with beam.Pipeline(options=options) as p:
(p |
'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
'Print Result' >> beam.Map(lambda x: print_data(x))
So first, I created a function which receives a protobuf schema class and the protobuf data (currently in byte strings) as argument. This function will initialize and parse the string bytes data into a protobuf message, and converts the protobuf message into a python dictionary.
This function is then used by beam.Map
, so now I was able to develop a generic Protobuf Parser on beam without beam.DoFn
. However, I'm still curious though about why the protobuf schema class is problematic when used with DoFn, so if you know why and how to solve this, please share your answer here, thanks!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With