I want to use SparkContext and SQLContext inside foreachPartition, but unable to do it due to serialization error. I know that both objects are not serializable, but I thought that foreachPartition is executed on the master, where both Spark Context and SQLContext are available..
Notation:
`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
This is my current code (UtilsDM is an object that extends Serializable). The part of code that fails starts from val schema =..., where I want to write result to the DataFrame and then save it to Parquet. Maybe the way I organized the code is inefficient, then I'd like to here your recommendations. Thanks.
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)
// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}
// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)
result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)
// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})
EDIT:
I am using Spark 1.6.2 and Scala 2.10.6.
It is not possible. SparkContext, SQLContext and SparkSession can be used only on the driver. You can use sqlContext in the top level of foreachRDD:
myDStream.foreachRDD(rdd => {
val df = sqlContext.createDataFrame(rdd, schema)
...
})
You cannot use it in transformation / action:
myDStream.foreachRDD(rdd => {
rdd.foreach {
val df = sqlContext.createDataFrame(...)
...
}
})
You probably want equivalent of:
myDStream.foreachRDD(rdd => {
val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
val df = sqlContext.createDataFrame(foo, schema)
df.write.parquet("s3n://bucket/pathToSentMessages)
})
I found out that using an existing SparkContext (assume I have created a sparkContext sc beforehand) inside a loop works i.e.
// this works
stream.foreachRDD( _ => {
// update rdd
.... = SparkContext.getOrCreate().parallelize(...)
})
// this doesn't work - throws a SparkContext not serializable error
stream.foreachRDD( _ => {
// update rdd
.... = sc.parallelize(...)
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With