Collecting output from Apache Beam pipeline and displaying it to console



I have been working on Apache Beam for a couple of days. I wanted to quickly iterate on the application I am working and make sure the pipeline I am building is error free. In spark we can use sc.parallelise and when we apply some action we get the value that we can inspect.

Similarly when I was reading about Apache Beam, I found that we can create a PCollection and work with it using following syntax

with beam.Pipeline() as pipeline:
    lines = pipeline | beam.Create(["this is test", "this is another test"])
    word_count = (lines 
                  | "Word" >> beam.ParDo(lambda line: line.split(" "))
                  | "Pair of One" >> beam.Map(lambda w: (w, 1))
                  | "Group" >> beam.GroupByKey()
                  | "Count" >> beam.Map(lambda (w, o): (w, sum(o))))
    result = pipeline.run()

I actually wanted to print the result to console. But I couldn't find any documentation around it.

Is there a way to print the result to console instead of saving it to a file each time?

You don't need the temp list. In python 2.7 the following should be sufficient:

def print_row(row):
    print row

    | ...
    | "print" >> beam.Map(print_row)

result = pipeline.run()

In python 3.x, print is a function so the following is sufficient:

    | ...
    | "print" >> beam.Map(print)

result = pipeline.run()
After exploring furthermore and understanding how I can write testcases for my application I figure out the way to print the result to console. Please not that I am right now running everything to a single node machine and trying to understand functionality provided by apache beam and how can I adopt it without compromising industry best practices.

So, here is my solution. At the very last stage of our pipeline we can introduce a map function that will print result to the console or accumulate the result in a variable later we can print the variable to see the value

import apache_beam as beam

# lets have a sample string
data = ["this is sample data", "this is yet another sample data"]

# create a pipeline
pipeline = beam.Pipeline()
counts = (pipeline | "create" >> beam.Create(data)
    | "split" >> beam.ParDo(lambda row: row.split(" "))
    | "pair" >> beam.Map(lambda w: (w, 1))
    | "group" >> beam.CombinePerKey(sum))

# lets collect our result with a map transformation into output array
output = []
def collect(row):
    return True

counts | "print" >> beam.Map(collect)

# Run the pipeline
result = pipeline.run()

# lets wait until result a available

# print the output
print output
