I would like to know if the foreachPartition
will results in better performance, due to an higher level of parallelism, compared to the foreach
method considering the case in which I'm flowing through an RDD
in order to perform some sums into an accumulator variable.
foreach
and foreachPartitions
are actions.
A generic function for invoking operations with side effects. For each element in the RDD, it invokes the passed function . This is generally used for manipulating accumulators or writing to external stores.
Note: modifying variables other than Accumulators outside of the foreach()
may result in undefined behavior. See Understanding closures for more details.
example :
scala> val accum = sc.longAccumulator("My Accumulator") accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0) scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) ... 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s scala> accum.value res2: Long = 10
Similar to
foreach()
, but instead of invoking function for each element, it calls it for each partition. The function should be able to accept an iterator. This is more efficient thanforeach()
because it reduces the number of function calls (just likemapPartitions
() ).
Usage of foreachPartition
examples:
/** * Insert in to database using foreach partition. * * @param sqlDatabaseConnectionString * @param sqlTableName */ def insertToTable(sqlDatabaseConnectionString: String, sqlTableName: String): Unit = { //numPartitions = number of simultaneous DB connections you can planning to give datframe.repartition(numofpartitionsyouwant) val tableHeader: String = dataFrame.columns.mkString(",") dataFrame.foreachPartition { partition => // Note : Each partition one connection (more better way is to use connection pools) val sqlExecutorConnection: Connection = DriverManager.getConnection(sqlDatabaseConnectionString) //Batch size of 1000 is used since some databases cant use batch size more than 1000 for ex : Azure sql partition.grouped(1000).foreach { group => val insertString: scala.collection.mutable.StringBuilder = new scala.collection.mutable.StringBuilder() group.foreach { record => insertString.append("('" + record.mkString(",") + "'),") } sqlExecutorConnection.createStatement() .executeUpdate(f"INSERT INTO [$sqlTableName] ($tableHeader) VALUES " + insertString.stripSuffix(",")) } sqlExecutorConnection.close() // close the connection so that connections wont exhaust. } }
Usage of foreachPartition
with sparkstreaming (dstreams) and kafka producer
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // only once per partition You can safely share a thread-safe Kafka //producer instance. val producer = createKafkaProducer() partitionOfRecords.foreach { message => producer.send(message) } producer.close() } }
Note : If you want to avoid this way of creating producer once per partition, betterway is to broadcast producer using
sparkContext.broadcast
since Kafka producer is asynchronous and buffers data heavily before sending.
Accumulator samples snippet to play around with it... through which you can test the performance
test("Foreach - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreach(x => accum.add(x)) assert(accum.value == 6L) } test("Foreach partition - Spark") { import spark.implicits._ var accum = sc.longAccumulator sc.parallelize(Seq(1,2,3)).foreachPartition(x => x.foreach(accum.add(_))) assert(accum.value == 6L) }
foreachPartition
operations on partitions so obviously it would be better edge thanforeach
foreachPartition
should be used when you are accessing costly resources such as database connections or kafka producer etc.. which would initialize one per partition rather than one per element(foreach
). when it comes to accumulators you can measure the performance by above test methods, which should work faster in case of accumulators as well..
Also... see map vs mappartitions which has similar concept but they are tranformations.
foreach
auto run the loop on many nodes.
However, sometimes you want to do some operations on each node. For example, make a connection to database. You can not just make a connection and pass it into the foreach
function: the connection is only made on one node.
So with foreachPartition
, you can make a connection to database on each node before running the loop.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With