Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to make a generic Protobuf Parser DoFn in python beam?

Context
I'm working with a streaming pipeline which has a protobuf data source in pubsub. I wish to parse this protobuf into a python dict because the data sink requires the input to be a collection of dicts. I had developed a Protobuf Parser successfully by initializing the protobuf message in process function of DoFn.

Why a Generic Protobuf Parser is Needed

However, I wanted to know, is it possible to make a generic ProtobufParser DoFn on Beam? A generic DoFn is useful from the engineering perspective to avoid re-implementing existing functions and enabling code reuse. In Java, I know that we're able to use generics hence implementing this generic ProtobufParser in Java is relatively easy. Since Python functions are first-class objects, I was thinking whether it's possible to pass a Protobuf schema class (not the message instance object) into a DoFn. I tried to do this, however I kept failing.

Successful Parser with Caveat: not Generalizable

Below is my current successful protobuf parser. The protobuf message is initialized inside process function.

class ParsePubSubProtoToDict(beam.DoFn):

    def process(self, element, *args, **kwargs):
        from datapipes.protos.data_pb2 import DataSchema
        from google.protobuf.json_format import MessageToDict

        message = DataSchema()
        message.ParseFromString(element)

        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj

While it's good that the above Protobuf DoFn parser works, it isn't generalized to all protobuf schemas, hence this will result in needing to re-implement a new DoFn parser for a different protobuf schema.

My Attempts

To make the parser generalizable for all protobuf schemas, I tried to pass the protobuf schema —which is generated as a class in Python— to DoFn.

class ParsePubSubProtoToDict(beam.DoFn):
    def __init__(self, proto_class):
        self.proto_class = proto_class

    def process(self, element, *args, **kwargs):
        from google.protobuf.json_format import MessageToDict

        message = self.proto_class()
        message.ParseFromString(element)
        obj = MessageToDict(message, preserving_proto_field_name=True)

        yield obj


def run_pubsub_to_gbq_pipeline(argv):
    ...
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to JSON' >> beam.ParDo(ParsePubSubProtoToDict(data_pb2.DataSchema().__class__)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

and other similar techniques, however, all my attempts fail with the same error message: pickle.PicklingError: Can't pickle <class 'data_pb2.DataSchema'>: it's not found as data_pb2.DataSchema

From this error message, I had two hypotheses on why the problem occurs:

  1. Protobuf schema class is unserializable. However, this hypothesis probably wrong because while I'm aware pickle can't serialize the protobuf schema, if I used dill, I was able to serialize the protobuf schema. But aside from this, I'm still a bit unsure about how DoFn in python beam implements serialization (eg: when it uses dill or pickle to serialize things, what's the serialized format of the object to make it Serializable and compatible with DoFn, etc.)

  2. Import error in DoFn class. I had encountered several Import Error problems with python beam due to function/class scope and dataflow workers, to solve this problem, I had to import the package locally in the function where it's needed, not globally in the module. So maybe, if we pass the protobuf schema class to DoFn, the schema import is actually done outside of DoFn, hence DoFn can't resolve the class name inside DoFn correctly?


My questions would be:

  1. Why does this error occur, and how can I resolve this error?
  2. Is it possible to pass a protobuf schema class? Or is there a better way for implementing a generic protobuf to python dict parser DoFn without passing a protobuf schema class to DoFn?
  3. How does DoFn in Python works, how do I ensure that the object that is passed to DoFn's creation (__init__) is serializable? Is there a Serializable class on beam in which I could inherit so that I could transform my unserializable objects to serializable?

Thanks a lot! Your help would be greatly appreciated.

like image 242
dekauliya Avatar asked Apr 26 '19 13:04

dekauliya


1 Answers

I actually found an alternative solution for creating a generic Protobuf Parser with beam.Map

def convert_proto_to_dict(data, schema_class):
    message = schema_class()

    if isinstance(data, (str, bytes)):
        message.ParseFromString(data)
    else:
        message = data

    return MessageToDict(message, preserving_proto_field_name=True)


def run_pubsub_to_gbq_pipeline(argv):
    ... options initialization
    from datapipes.protos import data_pb2

    with beam.Pipeline(options=options) as p:
        (p |
         'Read from PubSub' >> beam.io.ReadFromPubSub(subscription=pubsub_config["subscription"]) |
         'Proto to Dict' >> beam.Map(lambda x: convert_proto_to_dict(x, data_pb2.DataSchema)) |
         'Print Result' >> beam.Map(lambda x: print_data(x))

So first, I created a function which receives a protobuf schema class and the protobuf data (currently in byte strings) as argument. This function will initialize and parse the string bytes data into a protobuf message, and converts the protobuf message into a python dictionary.

This function is then used by beam.Map, so now I was able to develop a generic Protobuf Parser on beam without beam.DoFn. However, I'm still curious though about why the protobuf schema class is problematic when used with DoFn, so if you know why and how to solve this, please share your answer here, thanks!

like image 79
dekauliya Avatar answered Nov 03 '22 01:11

dekauliya