Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Flink: No operators defined in streaming topology. Cannot execute

I am trying to setup a very basic flink job. When I try to run, get the following error:

Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute.
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1535)
    at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:53)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
    at com.test.flink.jobs.TestJobRunnable$.run(TestJob.scala:223)

The error is caused by the code below:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")

streamExecutionEnvironment.execute("Test Job")

The error goes away when I add a print() call to the end of the stream:

val streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val messageStream = streamExecutionEnvironment.addSource(kafkaConsumer)
messageStream.keyBy(_ => "S")
messageStream.print()

streamExecutionEnvironment.execute("Test Job")

I'm confused as to why print() solves this issue. Is the idea that a streaming topology does not process any of its operators until a sink is introduced? Is print() acting as a sink here? Any help would be appreciated. Thanks.

like image 357
Luciano Avatar asked Mar 04 '19 05:03

Luciano


1 Answers

In programming language theory, lazy evaluation, or call-by-need is an evaluation strategy which delays the evaluation of an expression until its value is needed and which also avoids repeated evaluations. The opposite of lazy evaluation is eager evaluation, sometimes known as strict evaluation. The benefits of lazy evaluation include:

  • The ability to define control flow (structures) as abstractions instead of primitives.
  • The ability to define potentially infinite data structures. This allows for more straightforward implementation of some algorithms.
  • Performance increases by avoiding needless calculations, and avoiding error conditions when evaluating compound expressions.

Lazy evaluation can lead to reduction in memory footprint, since values are created when needed. However, lazy evaluation is difficult to combine with imperative features such as exception handling and input/output, because the order of operations becomes indeterminate.

Generally, Flink divided operations into two class: transformations operations and sink operations. As you guess, Flink transformations are lazy, meaning that they are not executed until a sink operation is invoked.

Flink programs are regular programs that implement transformations on distributed collections (e.g., filtering, mapping, updating state, joining, grouping, defining windows, aggregating). Collections are initially created from sources (e.g., by reading from files, Kafka topics, or from local, in-memory collections). Results are returned via sinks, which may, for example, write the data to (distributed) files, or to standard output (for example, the command line terminal).

like image 143
Soheil Pourbafrani Avatar answered Sep 28 '22 06:09

Soheil Pourbafrani