So i'm preforming multiple operations on the same rdd in a kafka stream. Is caching that RDD going to improve performance?
When running multiple operations on the same dstream, cache
will substantially improve performance. This can be observed on the Spark UI:
Without the use of cache
, each iteration on the dstream will take the same time, so the total time to process the data in each batch interval will be linear to the number of iterations on the data:
When cache
is used, the first time the transformation pipeline on the RDD is executed, the RDD will be cached and every subsequent iteration on that RDD will only take a fraction of the time to execute.
(In this screenshot, the execution time of that same job was further reduced from 3s to 0.4s by reducing the number of partitions)
Instead of using dstream.cache
I would recommend to use dstream.foreachRDD
or dstream.transform
to gain direct access to the underlying RDD and apply the persist
operation. We use matching persist
and unpersist
around the iterative code to clean up memory as soon as possible:
dstream.foreachRDD{rdd =>
rdd.cache()
col.foreach{id => rdd.filter(elem => elem.id == id).map(...).saveAs...}
rdd.unpersist(true)
}
Otherwise, one needs to wait for the time configured on spark.cleaner.ttl
to clear up the memory.
Note that the default value for spark.cleaner.ttl
is infinite, which is not recommended for a production 24x7 Spark Streaming job.
Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small “hot” dataset or when running an iterative algorithm like PageRank.
https://spark.apache.org/docs/latest/quick-start.html#caching
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With