Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark : How to use mapPartition and create/close connection per partition

So, I want to do certain operations on my spark DataFrame, write them to DB and create another DataFrame at the end. It looks like this :

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    iterator.map(
       row => {
         addRowToBatch(row)
         convertRowToObject(row)
     })
    conn.writeTheBatchToDB()
    conn.close()
  })
  .toDF()

This gives me an error as mapPartitions expects return type of Iterator[NotInferedR], but here it is Unit. I know this is possible with forEachPartition, but I'd like to do the mapping also. Doing it separate would be an overhead (extra spark job). What to do?

Thanks!

like image 444
void Avatar asked Apr 11 '16 10:04

void


People also ask

What is the difference between map and mapPartitions in Spark?

mapPartitions() – This is precisely the same as map(); the difference being, Spark mapPartitions() provides a facility to do heavy initializations (for example, Database connection) once for each partition instead of doing it on every DataFrame row.

How Spark decides number of partitions?

By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.

How do I change the number of partitions in a Spark data frame?

repartition() can be used for increasing or decreasing the number of partitions of a Spark DataFrame.


2 Answers

On most cases, eager consuming the iterator will result to execution failure if not slow down of jobs. Thus what I've done was to check if iterator is already empty then do the cleanup routines.

rdd.mapPartitions(itr => {
    val conn = new DbConnection
    itr.map(data => {
       val yourActualResult = // do something with your data and conn here
       if(itr.isEmpty) conn.close // close the connection
       yourActualResult
    })
})

Thought this as a spark problem at first but was a scala one actually. http://www.scala-lang.org/api/2.12.0/scala/collection/Iterator.html#isEmpty:Boolean

like image 82
dansuzuki Avatar answered Sep 22 '22 07:09

dansuzuki


The last expression in the anonymous function implementation must be the return value:

import sqlContext.implicits._

val newDF = myDF.mapPartitions(
  iterator => {
    val conn = new DbConnection
    // using toList to force eager computation - make it happen now when connection is open
    val result = iterator.map(/* the same... */).toList
    conn.writeTheBatchToDB()
    conn.close()
    result.iterator
  }
).toDF()
like image 28
Tzach Zohar Avatar answered Sep 22 '22 07:09

Tzach Zohar