Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Beam - Integration test with unbounded PCollection

We are building an integration test for an Apache Beam pipeline and are running into some issues. See below for context...

Details about our pipeline:

  • We use PubsubIO as our data source (unbounded PCollection)
  • Intermediate transforms include a custom CombineFn and a very simple windowing/triggering strategy
  • Our final transform is JdbcIO, using org.neo4j.jdbc.Driver to write to Neo4j

Current testing approach:

  • Run Google Cloud's Pub/Sub emulator on the machine that the tests are running on
  • Build an in-memory Neo4j database and pass its URI into our pipeline options
  • Run pipeline by calling OurPipeline.main(TestPipeline.convertToArgs(options)
  • Use Google Cloud's Java Pub/Sub client library to publish messages to a test topic (using Pub/Sub emulator), which PubsubIO will read from
  • Data should flow through the pipeline and eventually hit our in-memory instance of Neo4j
  • Make simple assertions regarding the presence of this data in Neo4j

This is intended to be a simple integration test which will verify that our pipeline as a whole is behaving as expected.

The issue we're currently having is that when we run our pipeline it is blocking. We are using DirectRunner and pipeline.run() (not pipeline.run().waitUntilFinish()), but the test seems to hang after running the pipeline. Because this is an unbounded PCollection (running in streaming mode), the pipeline does not terminate, and thus any code after it is not reached.

So, I have a few questions:

1) Is there a way to run a pipeline and then stop it manually later?

2) Is there a way to run a pipeline asynchronously? Ideally it would just kick off the pipeline (which would then continuously poll Pub/Sub for data) and then move on to the code responsible for publishing to Pub/Sub.

3) Is this method of integration testing a pipeline reasonable, or are there better methods that might be more straightforward? Any info/guidance here would be appreciated.

Let me know if I can provide any additional code/context - thanks!

like image 397
Chris Staikos Avatar asked Jun 23 '17 18:06

Chris Staikos


People also ask

What is PCollection in Apache beam?

Each pipeline represents a single, repeatable job. A PCollection represents a potentially distributed, multi-element dataset that acts as the pipeline's data. Apache Beam transforms use PCollection objects as inputs and outputs for each step in your pipeline.

What is PCollection and PTransform in dataflow?

A PCollection can contain either a bounded or unbounded number of elements. Bounded and unbounded PCollections are produced as the output of PTransforms (including root PTransforms like Read and Create ), and can be passed as the inputs of other PTransforms.

Which transform can help you create a PCollection out of in memory Java collections like list that can be useful for testing?

Using the Create Transform You can use the Create transform to create a PCollection out of a standard in-memory collection class, such as Java or Python List .


1 Answers

You can run the pipeline asynchronously using the DirectRunner by passing setting the isBlockOnRun pipeline option to false. So long as you keep a reference to the returned PipelineResult available, calling cancel() on that result should stop the pipeline.

For your third question, your setup seems reasonable. However, if you want to have a smaller-scale test of your pipeline (requiring fewer components), you can encapsulate all of your processing logic within a custom PTransform. This PTransform should take inputs that have been fully parsed from an input source, and produce outputs that are yet to be parsed for the output sink.

When this is done, you can use either Create (which will generally not exercise triggering) or TestStream (which may, depending on how you construct the TestStream) with the DirectRunner to generate a finite amount of input data, apply this processing PTransform to that PCollection, and use PAssert on the output PCollection to verify that the pipeline generated the outputs which you expect.

For more information about testing, the Beam website has information about these styles of tests in the Programming Guide and a blog post about testing pipelines with TestStream.

like image 138
Thomas Groh Avatar answered Sep 30 '22 04:09

Thomas Groh