How can I convert RDD to DataFrame in Spark Streaming, not just Spark?
I saw this example, but it requires SparkContext.
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
In my case I have StreamingContext. Should I then create SparkContext inside foreach? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");, which is not possible for RDD without converting it to DataFrame (as I know).
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
You can convert from core Stream objects to Batch, and DataFrame objects using the . to_batch and . to_dataframe methods. In each case we assume that the stream is a stream of batches (lists or tuples) or a list of Pandas dataframes.
In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.
Methods for creating Spark DataFrame 1. Create a list and parse it as a DataFrame using the toDataFrame() method from the SparkSession . 2. Convert an RDD to a DataFrame using the toDF() method.
Create sqlContext outside foreachRDD ,Once you convert the rdd to DF using sqlContext, you can write into S3.
For example:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
Update:
Even you can create sqlContext inside foreachRDD which is going to execute on Driver.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With