Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Kafka Direct DStream - How many executors and RDD partitions in yarn-cluster mode if num-executors is set?


I am trying to use Spark Kafka Direct Stream approach. It simplifies parallelism by creating as many RDD partitions as kafka topic partition, as stated in this doc. And base on my understanding, spark will create one executor for each RDD partition to do the computation.

So when I submit the application in yarn-cluster mode, and specify option num-executors to a different value to the number of partitions, how many executors there will be?

For example, there is a kafka topic with 2 partition, and I specify num-executors to 4:

export YARN_CONF_DIR=$HADOOP_HOME/client_conf

./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1

I do give it a try and find out the number of executors is 4, and each executor do reads and processes data from kafka. Why? There is only 2 partitions in kafka topic, How does 4 executors read from the kafka topic, which only has 2 partitions?

Below is the details of the spark application and logs.

My spark application, which prints received messages ( in flatMap method) from kafka in every executor:

    ...
    String brokers = args[0];
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
    kafkaParams.put("metadata.broker.list", brokers);

    JavaPairInputDStream<String, String> messages =
        KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
            kafkaParams, topicsSet);

    JavaPairDStream<String, Integer> wordCounts =
        messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
        {
            public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
            {
                System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
                    tuple._2())); // print the kafka message received  in executor
                return Arrays.asList(SPACE.split(tuple._2()));
            }

        }).mapToPair(new PairFunction<String, String, Integer>()
        {
            public Tuple2<String, Integer> call(String word) throws Exception
            {
                System.out.println(String.format("[word]: %s", word));
                return new Tuple2<String, Integer>(word, 1);
            }

        }).reduceByKey(new Function2<Integer, Integer, Integer>()
        {
            public Integer call(Integer v1, Integer v2) throws Exception
            {
                return v1 + v2;
            }

        });

    wordCounts.print();

    Runtime.getRuntime().addShutdownHook(new Thread(){
        @Override
        public void run(){
            System.out.println("gracefully shutdown Spark!");
            jssc.stop(true, true);
        }
    });
    jssc.start();
    jssc.awaitTermination();

My Kafka topic, with 2 partitions. String "hello hello word 1", "hello hello word 2", "hello hello word 3", ... are sent to the topic.

Topic: topic_2  PartitionCount:2    ReplicationFactor:2 Configs:
Topic: topic_2  Partition: 0    Leader: 3   Replicas: 3,1   Isr: 3,1
Topic: topic_2  Partition: 1    Leader: 1   Replicas: 1,2   Isr: 1,2

Webconsle: enter image description here

console output of executor 1:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...

console output of executor 2:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...

console output of executor 3:

...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...
like image 397
yzandrew Avatar asked Jul 10 '15 23:07

yzandrew


People also ask

How do you define the number of executors in Spark?

Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30.

What is Spark DStream?

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.

Which API is used to create a DStream on Kafka topic?

StreamingContext API Using the methods provided by this API, you can create DStreams from various input sources.

What is the use of saveAsObjectFiles () operation on DStreams?

def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.


1 Answers

Each partition is operated on by one executor at a time (assuming you don't have speculative execution turned on).

If you have more executors than you do partitions, not all of them will be doing work on any given RDD. But as you noted, since a DStream is a sequence of RDDs, over time each executor will do some work.

like image 191
Cody Koeninger Avatar answered Oct 19 '22 10:10

Cody Koeninger