Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

set spark.streaming.kafka.maxRatePerPartition for createDirectStream

I need to increase the input rate per partition for my application and I have use .set("spark.streaming.kafka.maxRatePerPartition",100) for the config. The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch. However, the input rate I received is just about 500. Can You suggest any modifications to increase this rate?

like image 767
innovatism Avatar asked Dec 07 '16 16:12

innovatism


People also ask

What is Spark streaming Kafka maxRatePerPartition?

An important one is spark. streaming. kafka. maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. Deploying: This is same as the first approach.

How can I improve my Spark streaming speed?

Increase driver and executor memory Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver.

What is the primary difference between Kafka streams and Spark streaming?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.


1 Answers

The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch.

That's not what the setting means. It means "how many elements each partition can have per batch", not per second. I'm going to assume you have 5 partitions, so you're getting 5 * 100 = 500. If you want 5000, set maxRatePerPartition to 1000.

From "Exactly-once Spark Streaming From Apache Kafka" (written by the Cody, the author of the Direct Stream approach, emphasis mine):

For rate limiting, you can use the Spark configuration variable spark.streaming.kafka.maxRatePerPartition to set the maximum number of messages per partition per batch.

Edit:

After @avrs comment, I looked inside the code which defines the max rate. As it turns out, the heuristic is a bit more complex than stated in both the blog post and the docs.

There are two branches. If backpressure is enabled alongside maxRate, then the maxRate is the minimum between the current backpressure rate calculated by the RateEstimator object and maxRate set by the user. If it isn't enabled, it takes the maxRate defined as is.

Now, after selecting the rate it always multiplies by the total batch seconds, effectively making this a rate per second:

if (effectiveRateLimitPerPartition.values.sum > 0) {
  val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
  Some(effectiveRateLimitPerPartition.map {
    case (tp, limit) => tp -> (secsPerBatch * limit).toLong
  })
} else {
  None
}
like image 132
Yuval Itzchakov Avatar answered Sep 21 '22 23:09

Yuval Itzchakov