Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do you test Beam pipeline (Google Dataflow) in Python?

I am having understanding how we are supposed to test our pipeline using Google DataFlow(based on Apache Beam) Python SDK.

https://beam.apache.org/documentation/pipelines/test-your-pipeline/ https://cloud.google.com/dataflow/pipelines/creating-a-pipeline-beam

The above link is ONLY for Java. I am pretty confused as to why Google will point to Java Apache testing.

I want to be able to view the results of a CoGroupByKey join on two p collections. I am coming from a Python background, and I have little to no experience using Beam/Dataflow.

Could really use any help. I know this is open ended to an extent.. basically I need to be able to view results within my pipeline and it's preventing me from seeing the results of my CoGroupByKey Join.

Code Below

    #dwsku, product are PCollections coming from BigQuery. Nested Values as 
    #well in Product, but not dwsku
    d1 = {'dwsku': dwsku, 'product': product}
    results = d1 | beam.CoGroupByKey()
    print results

What is printed:

    PCollection[CoGroupByKey/Map(_merge_tagged_vals_under_key).None]
like image 227
codebrotherone Avatar asked Nov 21 '17 17:11

codebrotherone


People also ask

How do you test a Dataflow pipeline?

Testing a Pipeline End-to-End For every source of input data to your pipeline, create some known static test input data. Create some static test output data that matches what you expect in your pipeline's final output PCollection (s). Create a TestPipeline in place of the standard Pipeline. create .

How do you run a Dataflow pipeline in GCP?

You can run your job on managed Google Cloud resources by using the Dataflow runner service. Running your pipeline with Dataflow creates a Dataflow job, which uses Compute Engine and Cloud Storage resources in your Google Cloud project. Note: Typing Ctrl+C from the command line does not cancel your job.

How do you run a Dataflow pipeline?

If you are looking for a step-by-step guide on how to create and deploy your first pipeline, use Dataflow's quickstarts for Java, Python, Go, or templates. After you construct and test your Apache Beam pipeline, you can use the Dataflow managed service to deploy and execute it.


1 Answers

If you want to test it locally on your machine, you should start with using DirectRunner and then you will be able to debug it - either by printing logs or by stopping the execution in debugger.

In order to see whole PCollection locally you can do the following:

d1 = {'dwsku': dwsku, 'product': product}
results = d1 | beam.CoGroupByKey()

def my_debug_function(pcollection_as_list):
    # add a breakpoint in this function or just print
    print pcollection_as_list

debug = (results | beam.combiners.ToList() | beam.Map(my_debug_function))

There are a few things to remember in here:

  • ToList() transform can potentially allocate a lot of memory
  • while using DirectRunner you should use .wait_until_finish() method of your pipeline, so that you script will not end before the pipeline finishes executing
  • if your pipeline downloads data from BigQuery, you should put LIMIT in the query when running locally
like image 191
Marcin Zablocki Avatar answered Sep 27 '22 20:09

Marcin Zablocki