Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use SQLContext and SparkContext inside foreachPartition

I want to use SparkContext and SQLContext inside foreachPartition, but unable to do it due to serialization error. I know that both objects are not serializable, but I thought that foreachPartition is executed on the master, where both Spark Context and SQLContext are available..

Notation:

`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`

This is my current code (UtilsDM is an object that extends Serializable). The part of code that fails starts from val schema =..., where I want to write result to the DataFrame and then save it to Parquet. Maybe the way I organized the code is inefficient, then I'd like to here your recommendations. Thanks.

// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
  df = sqlContext
    .read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)

// Here I process myDStream
myDStream.foreachRDD(rdd => {
  rdd.foreachPartition{iter =>
    val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
    val producer = UtilsDM.createProducer
    var df = UtilsDM.getDF
    val result = iter.map{ msg =>
        // ... 
        Seq(msg("key"),msg("value"))
    }

    // HERE I WANT TO WRITE result TO S3, BUT IT FAILS
    val schema = StructType(
                    StructField("key", StringType, true) ::
                    StructField("value", StringType, true)

    result.foreach { row =>
       val rdd = sc.makeRDD(row)
       val df2 = sqlContext.createDataFrame(rdd, schema)

       // If the parquet file is not created, then create it
       var df_final: DataFrame = null
       if (df != null) {
          df_final = df.unionAll(df2)
       } else {
          df_final = df2
       }
       df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
  }
})

EDIT:

I am using Spark 1.6.2 and Scala 2.10.6.

like image 207
duckertito Avatar asked Jan 05 '23 01:01

duckertito


2 Answers

It is not possible. SparkContext, SQLContext and SparkSession can be used only on the driver. You can use sqlContext in the top level of foreachRDD:

 myDStream.foreachRDD(rdd => {
     val df = sqlContext.createDataFrame(rdd, schema)
     ... 
 })

You cannot use it in transformation / action:

myDStream.foreachRDD(rdd => {
     rdd.foreach { 
        val df = sqlContext.createDataFrame(...)
        ... 
     }
 })

You probably want equivalent of:

myDStream.foreachRDD(rdd => {
   val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
   val df = sqlContext.createDataFrame(foo, schema)
   df.write.parquet("s3n://bucket/pathToSentMessages)
})
like image 111
2 revsuser6022341 Avatar answered Jan 12 '23 15:01

2 revsuser6022341


I found out that using an existing SparkContext (assume I have created a sparkContext sc beforehand) inside a loop works i.e.

// this works
stream.foreachRDD( _ => {
    // update rdd
    .... = SparkContext.getOrCreate().parallelize(...)
})

// this doesn't work - throws a SparkContext not serializable error
stream.foreachRDD( _ => {
    // update rdd
    .... = sc.parallelize(...)
})
like image 34
user9395367 Avatar answered Jan 12 '23 15:01

user9395367