Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert RDD to DataFrame in Spark Streaming, not just Spark

How can I convert RDD to DataFrame in Spark Streaming, not just Spark?

I saw this example, but it requires SparkContext.

val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
rdd.toDF()

In my case I have StreamingContext. Should I then create SparkContext inside foreach? It looks too crazy... So, how to deal with this issue? My final goal (if it might be useful) is to save the DataFrame in Amazon S3 using rdd.toDF.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json");, which is not possible for RDD without converting it to DataFrame (as I know).

myDstream.foreachRDD { rdd =>
    val conf = new SparkConf().setMaster("local").setAppName("My App")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc) 
    import sqlContext.implicits._
    rdd.toDF()
}
like image 386
Lobsterrrr Avatar asked Oct 12 '16 10:10

Lobsterrrr


People also ask

How do you convert a Spark RDD into a DataFrame?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

Can stream be converted to DataFrame?

You can convert from core Stream objects to Batch, and DataFrame objects using the . to_batch and . to_dataframe methods. In each case we assume that the stream is a stream of batches (lists or tuples) or a list of Pandas dataframes.

Can we convert RDD to DataFrame in PySpark?

In PySpark, toDF() function of the RDD is used to convert RDD to DataFrame.

Which method can be used to convert a Spark dataset to a DataFrame?

Methods for creating Spark DataFrame 1. Create a list and parse it as a DataFrame using the toDataFrame() method from the SparkSession . 2. Convert an RDD to a DataFrame using the toDF() method.


1 Answers

Create sqlContext outside foreachRDD ,Once you convert the rdd to DF using sqlContext, you can write into S3.

For example:

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._
myDstream.foreachRDD { rdd =>

    val df = rdd.toDF()
    df.write.format("json").saveAsTextFile("s3://iiiii/ttttt.json")
}

Update:

Even you can create sqlContext inside foreachRDD which is going to execute on Driver.

like image 138
Shankar Avatar answered Sep 20 '22 16:09

Shankar