I want to use SparkContext and SQLContext inside foreachPartition
, but unable to do it due to serialization error. I know that both objects are not serializable, but I thought that foreachPartition
is executed on the master, where both Spark Context and SQLContext are available..
Notation:
`msg -> Map[String,String]`
`result -> Iterable[Seq[Row]]`
This is my current code (UtilsDM is an object that extends Serializable
). The part of code that fails starts from val schema =...
, where I want to write result
to the DataFrame
and then save it to Parquet. Maybe the way I organized the code is inefficient, then I'd like to here your recommendations. Thanks.
// Here I am creating df from parquet file on S3
val exists = FileSystem.get(new URI("s3n://" + bucketNameCode), sc.hadoopConfiguration).exists(new Path("s3n://" + bucketNameCode + "/" + pathToSentMessages))
var df: DataFrame = null
if (exists) {
df = sqlContext
.read.parquet("s3n://bucket/pathToParquetFile")
}
UtilsDM.setDF(df)
// Here I process myDStream
myDStream.foreachRDD(rdd => {
rdd.foreachPartition{iter =>
val r = new RedisClient(UtilsDM.getHost, UtilsDM.getPort)
val producer = UtilsDM.createProducer
var df = UtilsDM.getDF
val result = iter.map{ msg =>
// ...
Seq(msg("key"),msg("value"))
}
// HERE I WANT TO WRITE result TO S3, BUT IT FAILS
val schema = StructType(
StructField("key", StringType, true) ::
StructField("value", StringType, true)
result.foreach { row =>
val rdd = sc.makeRDD(row)
val df2 = sqlContext.createDataFrame(rdd, schema)
// If the parquet file is not created, then create it
var df_final: DataFrame = null
if (df != null) {
df_final = df.unionAll(df2)
} else {
df_final = df2
}
df_final.write.parquet("s3n://bucket/pathToSentMessages)
}
}
})
EDIT:
I am using Spark 1.6.2 and Scala 2.10.6.
It is not possible. SparkContext
, SQLContext
and SparkSession
can be used only on the driver. You can use sqlContext in the top level of foreachRDD
:
myDStream.foreachRDD(rdd => {
val df = sqlContext.createDataFrame(rdd, schema)
...
})
You cannot use it in transformation / action:
myDStream.foreachRDD(rdd => {
rdd.foreach {
val df = sqlContext.createDataFrame(...)
...
}
})
You probably want equivalent of:
myDStream.foreachRDD(rdd => {
val foo = rdd.mapPartitions(iter => doSomethingWithRedisClient(iter))
val df = sqlContext.createDataFrame(foo, schema)
df.write.parquet("s3n://bucket/pathToSentMessages)
})
I found out that using an existing SparkContext (assume I have created a sparkContext sc beforehand) inside a loop works i.e.
// this works
stream.foreachRDD( _ => {
// update rdd
.... = SparkContext.getOrCreate().parallelize(...)
})
// this doesn't work - throws a SparkContext not serializable error
stream.foreachRDD( _ => {
// update rdd
.... = sc.parallelize(...)
})
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With