Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create groups of N elements from a PCollection Apache Beam Python

I am trying to accomplish something like this: Batch PCollection in Beam/Dataflow

The answer in the above link is in Java, whereas the language I'm working with is Python. Thus, I require some help getting a similar construction.

Specifically I have this:

 p = beam.Pipeline (options = pipeline_options)
 lines = p | 'File reading' >> ReadFromText (known_args.input)

After this, I need to create another PCollection but with a List of N rows of "lines" since my use case requires a group of rows. I can not operate line by line.

I tried a ParDo Function using variables for count associating with the counter N rows and after groupBy using Map. But these are reset every 1000 records, so it's not the solution I am looking for. I read the example in the link but I do not know how to do something like that in Python.

I tried saving the counters in Datastore, however, the speed difference between Dataflow reading and writing with Datastore is quite significant.

What is the correct way to do this? I don't know how else to approach it. Regards.

like image 409
Luis Felipe Muñoz Avatar asked Mar 26 '18 15:03

Luis Felipe Muñoz


People also ask

What is PCollection in Apache Beam?

Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is PCollection and PTransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

Which of the following element types can be encoded as a schema from a PCollection?

In many cases, the element type in a PCollection has a structure that can be introspected. Some examples are JSON, Protocol Buffer, Avro, and database row objects. All of these formats can be converted to Beam Schemas.

What is schema in Apache Beam?

Schemas provide us a type-system for Beam records that is independent of any specific programming-language type. There might be multiple Java classes that all have the same schema (for example a Protocol-Buffer class or a POJO class), and Beam will allow us to seamlessly convert between these types.


1 Answers

Assume the grouping order is not important, you can just group inside a DoFn.

class Group(beam.DoFn):
  def __init__(self, n):
     self._n = n
     self._buffer = []

  def process(self, element):
     self._buffer.append(element)
     if len(self._buffer) == self._n:
        yield list(self._buffer)
        self._buffer = []

  def finish_bundle(self):
     if len(self._buffer) != 0:
        yield list(self._buffer)
        self._buffer = []

lines = p | 'File reading' >> ReadFromText(known_args.input)
          | 'Group' >> beam.ParDo(Group(known_args.N)
          ...
like image 139
Jiayuan Ma Avatar answered Sep 22 '22 12:09

Jiayuan Ma