I'm currently new to using Apache Beam in Python with Dataflow runner. I'm interested in creating a batch pipeline that publishes to Google Cloud PubSub, I had tinkered with Beam Python APIs and found a solution. However, during my explorations, I encountered some interesting problems which made me curious.
Currently, my successful beam pipeline for publishing data in batch manner from GCS looks like this:
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
from google.cloud import pubsub_v1
publisher = pubsub_v1.PublisherClient()
future = publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
options = PipelineOptions(flags=argv)
from datapipes.common.dataflow_utils import CsvFileSource
from datapipes.protos import proto_schemas_pb2
from google.protobuf.json_format import MessageToJson
with beam.Pipeline(options=options) as p:
normalized_data = (
p |
"Read CSV from GCS" >> beam.io.Read(CsvFileSource(
"gs://bucket/path/to/file.csv")) |
"Normalize to Proto Schema" >> beam.Map(
lambda data: MessageToJson(
proto_schemas_pb2(data, proto_schemas_pb2.MySchema()),
indent=0,
preserving_proto_field_name=True)
)
)
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(topic_path="projects/my-gcp-project/topics/mytopic"))
)
Here, I attempted to make the publisher shared accross DoFn
. I had attempted the following methods.
a. Initializing publisher in DoFn
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = pubsub_v1.PublisherClient(batch_settings)
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
... ## same as 1
b. Initializing Publisher outside DoFn, and pass the it to DoFn
class PublishFn(beam.DoFn):
def __init__(self, publisher, topic_path):
self.publisher = publisher
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
def run_gcs_to_pubsub(argv):
.... ## same as 1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
with beam.Pipeline(options=options) as p:
... # same as 1
(normalized_data |
"Write to PubSub" >> beam.ParDo(
PublishFn(publisher=publisher, topic_path="projects/my-gcp-project/topics/mytopic"))
)
Both attempts for making the publisher shared across DoFn
methods failed with the following error messages:
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
and
File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__
My questions would be:
Would the shared publisher implementation improve beam pipeline performance? If yes, then I would like to explore this solution.
Why do the errors occur on my failing pipelines? Is it due to the initializing and passing custom class object to DoFn outside the process
function? If it is due to that, how can I implement a pipeline such that I would able to reuse a custom object in DoFn?
Thank you, your help would be greatly appreciated.
Okay, so Ankur has explained why my problem occurs and discussed how serialization is done on DoFn. Based on this knowledge, I now understand that there are two solutions for making custom object shared/reusable in DoFn:
Make the custom object Serializable: this allows the object to be initialized/available during DoFn object creation (under __init__
). This object must be serializable since it'll get serialized during pipeline submission in which the DoFn object will be created (which calls __init__
). How you can achieve this is answered below in my answer. Also, I found out that this requirement is actually associated to Beam Documentation under [1][2].
Initialize Non-serializable objects in DoFn's functions outside __init__
to avoid serialization since functions outside init aren't called during pipeline submission. How you can accomplish this is explained in Ankur's answer.
References:
[1] https://beam.apache.org/documentation/programming-guide/#core-beam-transforms
[2] https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms
ParDo is the core parallel processing operation in the Apache Beam SDKs, invoking a user-specified function on each of the elements of the input PCollection . ParDo collects the zero or more output elements into an output PCollection . The ParDo transform processes elements independently and possibly in parallel.
ParDo is the computational pattern of per-element computation. It has some variations, but you don't need to worry about that for this question. The DoFn , here I called it fn , is the logic that is applied to each element.
Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections.
A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.
PublisherClient
can not be pickled correctly. More on pickling here.
Initializing the PublisherClient
in the process
method avoids the pickling of PublisherClient
.
If the intent is to reuse the PublisherClient
, I would recommend initializing PublisherClient
in the process method and storing it in self
using following code.
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
if not hasattr(self, 'publish'):
from google.cloud import pubsub_v1
self.publisher = pubsub_v1.PublisherClient()
future = self.publisher.publish(self.topic_path, data=element.encode("utf-8"))
return future.result()
Thanks to Ankur, I discovered that this problem is due to the pickling problem in python. I then attempted to isolate the problem by solving the problem of pickling PublisherClient
first and found a solution in sharing PublisherClient
across DoFn
on Beam.
In python, we can pickle custom object with dill
package, and I realized that this package is already used on Beam python implementation for pickling objects. So I tried to troubleshoot the problem, and discovered this error:
TypeError: no default __reduce__ due to non-trivial __cinit__
Then, I tried to fix this error, and my pipeline now works!
Below is the solution:
class PubsubClient(PublisherClient):
def __reduce__(self):
return self.__class__, (self.batch_settings,)
# The DoFn to perform on each element in the input PCollection.
class PublishFn(beam.DoFn):
def __init__(self, topic_path):
self.topic_path = topic_path
from google.cloud import pubsub_v1
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
)
self.publisher = PubsubClient(batch_settings=batch_settings)
super(self.__class__, self).__init__()
def process(self, element, **kwargs):
future = self.publisher.publish(topic=self.topic_path, data=element.encode("utf-8"))
return future.result()
# ...the run_gcs_to_pubsub is the same as my successful pipeline
The solution works like this: First, I subclass from PublisherClient
and implement the __reduce__
function myself. Note that because I only used batch_settings
property to initialize my PublisherClient
, this property is enough for my __reduce__
function. I then used this modified PublisherClient
for my DoFn in __init__
.
Hopefully, with this new solution, my pipeline would gain performance improvement.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With