I'm writing a custom DataFlow unbounded data source that reads from Kafka 0.8. I'd like to run it locally using the DirectPipelineRunner. However, I'm getting the following stackstrace:
Exception in thread "main" java.lang.IllegalStateException: no evaluator registered for Read(KafkaDataflowSource)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:700)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:252)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:662)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:374)
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:87)
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:174)
Which makes some sense, as I haven't registered an evaluator for my custom source at any time.
Reading https://github.com/GoogleCloudPlatform/DataflowJavaSDK, it seems like only evaluators for bounded sources are registered. What's the recommended way to define and register an evaluator for an custom unbounded source?
DirectPipelineRunner
currently runs over bounded input only. We are actively working on removing this restriction, and expect to release it shortly.
In the meanwhile, you can trivially turn any UnboundedSource
into a BoundedSource
, for testing purposes, by using withMaxNumRecords
, as in the following example:
UnboundedSource<String> unboundedSource = ...; // make a Kafka source
PCollection<String> boundedKafkaCollection =
p.apply(Read.from(unboundedSource).withMaxNumRecords(10));
See this issue on GitHub for more details.
Separately, there are several efforts on contributing the Kafka connector. You may want to engage with us and other contributors about that via our GitHub repository.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With