How can I convert RDD
to DataFrame
in Spark Streaming
, not just Spark
?
I saw this example, but it requires SparkContext
.
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
In my case I have StreamingContext
. Should I then create SparkContext
inside foreach
? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame
in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");
, which is not possible for RDD
without converting it to DataFrame
(as I know).
myDstream.foreachRDD { rdd =>
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
rdd.toDF()
}
Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.
You can convert from core Stream objects to Batch, and DataFrame objects using the . to_batch and . to_dataframe methods. In each case we assume that the stream is a stream of batches (lists or tuples) or a list of Pandas dataframes.
In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.
Methods for creating Spark DataFrame 1. Create a list and parse it as a DataFrame using the toDataFrame() method from the SparkSession . 2. Convert an RDD to a DataFrame using the toDF() method.
Create sqlContext
outside foreachRDD
,Once you convert the rdd
to DF using sqlContext
, you can write into S3.
For example:
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>
val df = rdd.toDF()
df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}
Update:
Even you can create sqlContext
inside foreachRDD
which is going to execute on Driver.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With