Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to convert an RDD[Row] back to DataFrame [duplicate]

I've been playing around with converting RDDs to DataFrames and back again. First, I had an RDD of type (Int, Int) called dataPair. Then I created a DataFrame object with column headers using:

val dataFrame = dataPair.toDF(header(0), header(1))

Then I converted it from a DataFrame back to an RDD using:

val testRDD = dataFrame.rdd

which returns an RDD of type org.apache.spark.sql.Row (not (Int, Int)). Then I'd like to convert it back to an RDD using .toDF but I get an error:

error: value toDF is not a member of org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]

I've tried defining a Schema of type Data(Int, Int) for testRDD, but I get type mismatch exceptions:

error: type mismatch;
found   : org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
required: org.apache.spark.rdd.RDD[Data]
    val testRDD: RDD[Data] = dataFrame.rdd
                                       ^

I've already imported

import sqlContext.implicits._
like image 535
TheElysian Avatar asked May 03 '16 18:05

TheElysian


People also ask

How you will convert RDD into data frame and Datasets?

Convert Using createDataFrame Method This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema. We can observe the column names are following a default sequence of names based on a default template.

How do I convert a row to a DataFrame in Spark?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

Can we convert RDD to DataFrame in PySpark?

Method 1: Using createDataframe() function. After creating the RDD we have converted it to Dataframe using createDataframe() function in which we have passed the RDD and defined schema for Dataframe.

How do I duplicate a row in Spark?

In order to duplicate all records from a dataframe by N times, add a new column to the dataframe with a literal value of an array of size N, and then use explode function to make each element of the array to create its own row.

What does RDD collect () return?

collect. Return a list that contains all of the elements in this RDD. This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver's memory.

Is it possible to modify an RDD to make it contain different values?

RDDs are immutable (read-only) in nature. You cannot change an original RDD, but you can create new RDDs by performing coarse-grain operations, like transformations, on an existing RDD.

How do I get schema from RDD?

We can create a DataFrame programmatically using the following three steps. Create an RDD of Rows from an Original RDD. Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1. Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

How do I remove duplicates in Spark RDD?

Duplicate rows could be remove or drop from Spark SQL DataFrame using distinct() and dropDuplicates() functions, distinct() can be used to remove rows that have the same values on all columns whereas dropDuplicates() can be used to remove rows that have the same values on multiple selected columns.


1 Answers

To create a DataFrame from an RDD of Rows, usually you have two main options:

1) You can use toDF() which can be imported by import sqlContext.implicits._. However, this approach only works for the following types of RDDs:

  • RDD[Int]
  • RDD[Long]
  • RDD[String]
  • RDD[T <: scala.Product]

(source: Scaladoc of the SQLContext.implicits object)

The last signature actually means that it can work for an RDD of tuples or an RDD of case classes (because tuples and case classes are subclasses of scala.Product).

So, to use this approach for an RDD[Row], you have to map it to an RDD[T <: scala.Product]. This can be done by mapping each row to a custom case class or to a tuple, as in the following code snippets:

val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => (val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

or

case class MyClass(val1: String, ..., valN: Long = 0L)
val df = rdd.map({ 
  case Row(val1: String, ..., valN: Long) => MyClass(val1, ..., valN)
}).toDF("col1_name", ..., "colN_name")

The main drawback of this approach (in my opinion) is that you have to explicitly set the schema of the resulting DataFrame in the map function, column by column. Maybe this can be done programatically if you don't know the schema in advance, but things can get a little messy there. So, alternatively, there is another option:


2) You can use createDataFrame(rowRDD: RDD[Row], schema: StructType), which is available in the SQLContext object. Example:

val df = oldDF.sqlContext.createDataFrame(rdd, oldDF.schema)

Note that there is no need to explicitly set any schema column. We reuse the old DF's schema, which is of StructType class and can be easily extended. However, this approach sometimes is not possible, and in some cases can be less efficient than the first one.

I hope it's clearer than before. Cheers.

like image 169
Daniel de Paula Avatar answered Oct 16 '22 16:10

Daniel de Paula