Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka topic partitions to Spark streaming

I have some use cases that I would like to be more clarified, about Kafka topic partitioning -> spark streaming resource utilization.

I use spark standalone mode, so only settings I have are "total number of executors" and "executor memory". As far as I know and according to documentation, way to introduce parallelism into Spark streaming is using partitioned Kafka topic -> RDD will have same number of partitions as kafka, when I use spark-kafka direct stream integration.

So if I have 1 partition in the topic, and 1 executor core, that core will sequentially read from Kafka.

What happens if I have:

  • 2 partitions in the topic and only 1 executor core? Will that core read first from one partition and then from the second one, so there will be no benefit in partitioning the topic?

  • 2 partitions in the topic and 2 cores? Will then 1 executor core read from 1 partition, and second core from the second partition?

  • 1 kafka partition and 2 executor cores?

Thank you.

like image 263
Srdjan Nikitovic Avatar asked Jun 14 '16 11:06

Srdjan Nikitovic


People also ask

How do I transfer data from Kafka to spark?

Submiting to Spark Start Kafka Producer CLI (explained in the previous chapter), create a new topic called my-first-topic and provide some sample messages as shown below. Run the following command to submit the application to spark console. The sample output of this application is shown below. spark console messages ..

Can Spark read from Kafka?

Using Spark Streaming we can read from Kafka topic and write to Kafka topic in TEXT, CSV, AVRO and JSON formats, In this article, we will learn with scala example of how to stream from Kafka messages in JSON format using from_json() and to_json() SQL functions.

How many Kafka partitions should I have?

Following are some general guidelines: A Kafka cluster should have a maximum of 200,000 partitions across all brokers when managed by Zookeeper. The reason is that if brokers go down, Zookeeper needs to perform a lot of leader elections. Confluent still recommends up to 4,000 partitions per broker in your cluster.


1 Answers

The basic rule is that you can scale up to the number of Kafka partitions. If you set spark.executor.cores greater than the number of partitions, some of the threads will be idle. If it's less than the number of partitions, Spark will have threads read from one partition then the other. So:

  1. 2 partitions, 1 executor: reads from one partition then then other. (I am not sure how Spark decides how much to read from each before switching)

  2. 2p, 2c: parallel execution

  3. 1p, 2c: one thread is idle

For case #1, note that having more partitions than executors is OK since it allows you to scale out later without having to re-partition. The trick is to make sure that your partitions are evenly divisible by the number of executors. Spark has to process all the partitions before passing data onto the next step in the pipeline. So, if you have 'remainder' partitions, this can slow down processing. For example, 5 partitions and 4 threads => processing takes the time of 2 partitions - 4 at once then one thread running the 5th partition by itself.

Also note that you may also see better processing throughput if you keep the number of partitions / RDDs the same throughout the pipeline by explicitly setting the number of data partitions in functions like reduceByKey().

like image 87
AngerClown Avatar answered Sep 23 '22 12:09

AngerClown