I am trying to setup a very basic flink job. When I try to run, get the following error:
Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)
The error is caused by the code below:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
streamExecutionEnvironment.execute("Test Job")
The error goes away when I add a print()
call to the end of the stream:
val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()
streamExecutionEnvironment.execute("Test Job")
I'm confused as to why print()
solves this issue. Is the idea that a streaming topology does not process any of its operators until a sink is introduced? Is print()
acting as a sink here? Any help would be appreciated. Thanks.
In programming language theory, lazy evaluation, or call-by-need is an evaluation strategy which delays the evaluation of an expression until its value is needed and which also avoids repeated evaluations. The opposite of lazy evaluation is eager evaluation, sometimes known as strict evaluation. The benefits of lazy evaluation include:
Lazy evaluation can lead to reduction in memory footprint, since values are created when needed. However, lazy evaluation is difficult to combine with imperative features such as exception handling and input/output, because the order of operations becomes indeterminate.
Generally, Flink divided operations into two class: transformations operations and sink operations. As you guess, Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.
Flink programs are regular programs that implement transformations on distributed collections (e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating). Collections are initially created from sources (e.g., by reading from files, Kafka topics, or from local, in-memory collections). Results are returned via sinks, which may, for example, write the data to (distributed) files, or to standard output (for example, the command line terminal).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With