Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using custom DataFlow unbounded source on DirectPipelineRunner

I'm writing a custom DataFlow unbounded data source that reads from Kafka 0.8. I'd like to run it locally using the DirectPipelineRunner. However, I'm getting the following stackstrace:

Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
        at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
        at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)

Which makes some sense, as I haven't registered an evaluator for my custom source at any time.

Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK, it seems like only evaluators for bounded sources are registered. What's the recommended way to define and register an evaluator for an custom unbounded source?

like image 596
Thomas Avatar asked Jan 22 '16 14:01

Thomas


1 Answers

DirectPipelineRunner currently runs over bounded input only. We are actively working on removing this restriction, and expect to release it shortly.

In the meanwhile, you can trivially turn any UnboundedSource into a BoundedSource, for testing purposes, by using withMaxNumRecords, as in the following example:

UnboundedSource<String> unboundedSource  = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
    p.apply(Read.from(unboundedSource).withMaxNumRecords(10));

See this issue on GitHub for more details.


Separately, there are several efforts on contributing the Kafka connector. You may want to engage with us and other contributors about that via our GitHub repository.

like image 94
Davor Bonaci Avatar answered Oct 14 '22 05:10

Davor Bonaci