I have a data set which I am accessing from Spark (e.g. Parquet file), that contains a large number of rows. I need to send some data from these rows to external service, and I need to batch them so that each call to external service contains certain number of rows (say, 1000 rows per batch). Basically what take(n)
is doing, but do it repeatedly, iteratively, over a large data set. What is a good way to perform such task? I guess it can be done with foreach()
and aggregating data in batches manually, but I wonder if there are any built-in/recommended ways of doing so.
I am not aware of any built-in or recommended option but the simple solution is to combine RDD API and Scala Iterable
API. If operation you apply is idempotent you can do it directly from workers:
val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???
rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))
Otherwise you can fetch a single partition at the time to the driver:
rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)
Please note that it requires a separate job for each partition hence it is a good idea to cache input rdd first to avoid recomputation.
In Python you can use toolz
library as a replacement of the Iterator
API:
from toolz.itertoolz import partition_all
rdd.foreachPartition(
lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])
or
for xs in partition_all(batchSize, rdd.toLocalIterator()):
doSomething(xs)
When you create DataFrame from a parquet file it gets partitioned based on HDFS block locations.
So the first question to ask if you can write your data set to the external service in parallel. I.e. send batches of 1000 rows from multiple servers simultaneously.
If this is ok then the most efficient way to do it is foreachPartition
function. Something like:
df.rdd.forEachPartition { it =>
it.grouped(1000).foreach(sendBatch)
}
If your external service cannot be used this way then the second best option would be toLocalIterator
:
df.rdd.toLocalIterator { it =>
it.grouped(1000).foreach(sendBatch)
}
Note that this solution is significantly less efficient as it will serialize each partition and transfer it to the driver from executor.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With