Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Iterated take() or batch processing for Spark?

Tags:

apache-spark

I have a data set which I am accessing from Spark (e.g. Parquet file), that contains a large number of rows. I need to send some data from these rows to external service, and I need to batch them so that each call to external service contains certain number of rows (say, 1000 rows per batch). Basically what take(n) is doing, but do it repeatedly, iteratively, over a large data set. What is a good way to perform such task? I guess it can be done with foreach() and aggregating data in batches manually, but I wonder if there are any built-in/recommended ways of doing so.

like image 504
StasM Avatar asked Oct 22 '15 01:10

StasM


2 Answers

I am not aware of any built-in or recommended option but the simple solution is to combine RDD API and Scala Iterable API. If operation you apply is idempotent you can do it directly from workers:

val batchSize: Int = ???
val rdd: RDD[T] = ???
def doSomething(xs: Seq[T]) = ???

rdd.foreachPartition(_.grouped(batchSize).foreach(doSomething))

Otherwise you can fetch a single partition at the time to the driver:

rdd.cache
rdd.toLocalIterator.grouped(batchSize).foreach(doSomething)

Please note that it requires a separate job for each partition hence it is a good idea to cache input rdd first to avoid recomputation.

In Python you can use toolz library as a replacement of the Iterator API:

from toolz.itertoolz import partition_all

rdd.foreachPartition(
  lambda iter: [doSomething(xs) for xs in partition_all(batchSize, iter)])

or

for xs in partition_all(batchSize, rdd.toLocalIterator()):
    doSomething(xs)
like image 144
zero323 Avatar answered Oct 15 '22 05:10

zero323


When you create DataFrame from a parquet file it gets partitioned based on HDFS block locations.

So the first question to ask if you can write your data set to the external service in parallel. I.e. send batches of 1000 rows from multiple servers simultaneously.

If this is ok then the most efficient way to do it is foreachPartition function. Something like:

df.rdd.forEachPartition { it =>
  it.grouped(1000).foreach(sendBatch)
}

If your external service cannot be used this way then the second best option would be toLocalIterator:

df.rdd.toLocalIterator { it =>
  it.grouped(1000).foreach(sendBatch)
}

Note that this solution is significantly less efficient as it will serialize each partition and transfer it to the driver from executor.

like image 32
kostya Avatar answered Oct 15 '22 06:10

kostya