Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Collecting output from Apache Beam pipeline and displaying it to console

Tags:

apache-beam

I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.

Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

I actually wanted to print the result to console. But I couldn't find any documentation around it.

Is there a way to print the result to console instead of saving it to a file each time?

like image 708
Shamshad Alam Avatar asked Sep 25 '17 13:09

Shamshad Alam


People also ask

What is PCollection and PTransform in dataflow?

A PCollection is the input and output for each PTransform. PTransform: A PTransform is an operation that needs to be performed on a single data element. It takes an input PCollection and transforms it into zero or more output PCollections.

What is PTransform in Apache Beam?

A PTransform is an object describing (not executing) a computation. The actual execution semantics for a transform is captured by a runner object. A transform object always belongs to a pipeline object.

What is CoGroupByKey used for?

Aggregates all input elements by their key and allows downstream processing to consume all values associated with the key. While GroupByKey performs this operation over a single input collection and thus a single type of input values, CoGroupByKey operates over multiple input collections.


2 Answers

You don't need the temp list. In python 2.7 the following should be sufficient:

def print_row(row):
    print row

(pipeline 
    | ...
    | "print" >> beam.Map(print_row)
)

result = pipeline.run()
result.wait_until_finish()

In python 3.x, print is a function so the following is sufficient:

(pipeline 
    | ...
    | "print" >> beam.Map(print)
)

result = pipeline.run()
result.wait_until_finish()
like image 103
Oliver Avatar answered Oct 10 '22 04:10

Oliver


After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    output.append(row)
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available
result.wait_until_finish()

# print the output
print output
like image 33
Shamshad Alam Avatar answered Oct 10 '22 04:10

Shamshad Alam