Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Some puzzles for the operator Parallelism in Flink

I just got the example below for the parallelism and have some related questions:

  1. The setParallelism(5) is setting the Parallelism 5 just to sum or both flatMap and sum?

  2. Is it possible that we can set the different Parallelism to different operators such as flatMap and sum respectively ?such as set Parallelism 5 to sum and 10 to flatMap .

  3. Based on my understanding ,keyBy is partitioning the DataStream to logical Stream\partitions based on the different keys, and suppose there are 10,000 different key values, so there are 10,000 different partitions , then how many threads would deal with the 10,000 partitions? Just 5 threads? How about if we didn't set the setParallelism(5) ?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/parallel.html

final StreamExecutionEnvironment env =     
  StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
  .flatMap(new LineSplitter())
  .keyBy(0)
  .timeWindow(Time.seconds(5))
  .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
like image 805
YuFeng Shen Avatar asked Jun 08 '17 12:06

YuFeng Shen


Video Answer


2 Answers

When calling setParallelism on an operator, then it changes the parallelism of this specific operator. Consequently, in your example, only the window operator will be executed with a parallelism of 5 and the preceding flatMap operator with the default parallelism.

Consequently, you can set for each operator a different parallelism. However, be aware that operators with different parallelism cannot be chained and entail a rebalance (similar to a shuffle) operation.

If you want to set the parallelism for all operators, then you have to do it via the ExecutionEnvironment#setParallelism API call.

The keyBy operation partitions in the input stream into as many partitions as you have parallel operator instances. This makes sure that all elements with the same key end up in the same partition. So in your example where you set the parallelism to 5, you would end up with 5 partitions. Each partition can harbour elements with different keys.

like image 84
Till Rohrmann Avatar answered Sep 21 '22 09:09

Till Rohrmann


Execution Environment Level As mentioned here Flink programs are executed in the context of an execution environment. An execution environment defines a default parallelism for all operators, data sources, and data sinks it executes. Execution environment parallelism can be overwritten by explicitly configuring the parallelism of an operator.

The default parallelism of an execution environment can be specified by calling the setParallelism() method. To execute all operators, data sources, and data sinks with a parallelism of 3, set the default parallelism of the execution environment as follows:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
like image 24
Qi Wei Avatar answered Sep 21 '22 09:09

Qi Wei