I am trying to use Spark Kafka Direct Stream approach. It simplifies parallelism by creating as many RDD partitions as kafka topic partition, as stated in this doc. And base on my understanding, spark will create one executor for each RDD partition to do the computation.
So when I submit the application in yarn-cluster mode, and specify option num-executors to a different value to the number of partitions, how many executors there will be?
For example, there is a kafka topic with 2 partition, and I specify num-executors to 4:
export YARN_CONF_DIR=$HADOOP_HOME/client_conf
./bin/spark-submit \
--class playground.MainClass \
--master yarn-cluster \
--num-executors 4 \
../spark_applications/uber-spark-streaming-0.0.1-SNAPSHOT.jar \
127.0.0.1:9093,127.0.0.1:9094,127.0.0.1:9095 topic_1
I do give it a try and find out the number of executors is 4, and each executor do reads and processes data from kafka. Why? There is only 2 partitions in kafka topic, How does 4 executors read from the kafka topic, which only has 2 partitions?
Below is the details of the spark application and logs.
My spark application, which prints received messages ( in flatMap method) from kafka in every executor:
...
String brokers = args[0];
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(args[1].split(",")));
kafkaParams.put("metadata.broker.list", brokers);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);
JavaPairDStream<String, Integer> wordCounts =
messages.flatMap(new FlatMapFunction<Tuple2<String, String>, String>()
{
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception
{
System.out.println(String.format("[received from kafka] tuple_1 is %s, tuple_2 is %s", tuple._1(),
tuple._2())); // print the kafka message received in executor
return Arrays.asList(SPACE.split(tuple._2()));
}
}).mapToPair(new PairFunction<String, String, Integer>()
{
public Tuple2<String, Integer> call(String word) throws Exception
{
System.out.println(String.format("[word]: %s", word));
return new Tuple2<String, Integer>(word, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>()
{
public Integer call(Integer v1, Integer v2) throws Exception
{
return v1 + v2;
}
});
wordCounts.print();
Runtime.getRuntime().addShutdownHook(new Thread(){
@Override
public void run(){
System.out.println("gracefully shutdown Spark!");
jssc.stop(true, true);
}
});
jssc.start();
jssc.awaitTermination();
My Kafka topic, with 2 partitions. String "hello hello word 1", "hello hello word 2", "hello hello word 3", ... are sent to the topic.
Topic: topic_2 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: topic_2 Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: topic_2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Webconsle:
console output of executor 1:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 12
[word]: hello
[word]: hello
[word]: world
[word]: 12
...
console output of executor 2:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 2
[word]: hello
[word]: hello
[word]: world
[word]: 2
...
console output of executor 3:
...
[received from kafka] tuple_1 is null, tuple_2 is hello hello world 3
[word]: hello
[word]: hello
[word]: world
[word]: 3
...
Number of available executors = (total cores/num-cores-per-executor) = 150/5 = 30.
Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.
StreamingContext API Using the methods provided by this API, you can create DStreams from various input sources.
def saveAsObjectFiles(prefix: String, suffix: String = ""): Unit. Save each RDD in this DStream as a Sequence file of serialized objects. Save each RDD in this DStream as a Sequence file of serialized objects. The file name at each batch interval is generated based on prefix and suffix : "prefix-TIME_IN_MS.
Each partition is operated on by one executor at a time (assuming you don't have speculative execution turned on).
If you have more executors than you do partitions, not all of them will be doing work on any given RDD. But as you noted, since a DStream is a sequence of RDDs, over time each executor will do some work.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With