I need to increase the input rate per partition for my application and I have use .set("spark.streaming.kafka.maxRatePerPartition",100)
for the config. The stream duration is 10s so I expect process 5*100*10=5000
messages for this batch. However, the input rate I received is just about 500. Can You suggest any modifications to increase this rate?
An important one is spark. streaming. kafka. maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API. Deploying: This is same as the first approach.
Increase driver and executor memory Out of memory issues and random crashes of the application were solved by increasing the memory from 20g per executor to 40g per executor as well as 40g for the driver.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
The stream duration is 10s so I expect process 5*100*10=5000 messages for this batch.
That's not what the setting means. It means "how many elements each partition can have per batch", not per second. I'm going to assume you have 5 partitions, so you're getting 5 * 100 = 500. If you want 5000, set maxRatePerPartition
to 1000.
From "Exactly-once Spark Streaming From Apache Kafka" (written by the Cody, the author of the Direct Stream approach, emphasis mine):
For rate limiting, you can use the Spark configuration variable
spark.streaming.kafka.maxRatePerPartition
to set the maximum number of messages per partition per batch.
After @avrs comment, I looked inside the code which defines the max rate. As it turns out, the heuristic is a bit more complex than stated in both the blog post and the docs.
There are two branches. If backpressure is enabled alongside maxRate, then the maxRate is the minimum between the current backpressure rate calculated by the RateEstimator
object and maxRate set by the user. If it isn't enabled, it takes the maxRate defined as is.
Now, after selecting the rate it always multiplies by the total batch seconds, effectively making this a rate per second:
if (effectiveRateLimitPerPartition.values.sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> (secsPerBatch * limit).toLong
})
} else {
None
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With