Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark RDD to DataFrame python

I am trying to convert the Spark RDD to a DataFrame. I have seen the documentation and example where the scheme is passed to sqlContext.CreateDataFrame(rdd,schema) function.

But I have 38 columns or fields and this will increase further. If I manually give the schema specifying each field information, that it going to be so tedious job.

Is there any other way to specify the schema without knowing the information of the columns prior.

like image 931
Jack Daniel Avatar asked Sep 26 '16 09:09

Jack Daniel


People also ask

How do you convert a Spark RDD into a DataFrame?

Converting Spark RDD to DataFrame can be done using toDF(), createDataFrame() and transforming rdd[Row] to the data frame.

How do you create a DataFrame in PySpark from RDD?

Method 1: Using createDataframe() function. After creating the RDD we have converted it to Dataframe using createDataframe() function in which we have passed the RDD and defined schema for Dataframe.

Can we create DataFrame using RDD?

This method can take an RDD and create a DataFrame from it. The createDataFrame is an overloaded method, and we can call the method by passing the RDD alone or with a schema. We can observe the column names are following a default sequence of names based on a default template.


1 Answers

See,

There are two ways to convert an RDD to DF in Spark.

toDF() and createDataFrame(rdd, schema)

I will show you how you can do that dynamically.

toDF()

The toDF() command gives you the way to convert an RDD[Row] to a Dataframe. The point is, the object Row() can receive a **kwargs argument. So, there is an easy way to do that.

from pyspark.sql.types import Row  #here you are going to create a function def f(x):     d = {}     for i in range(len(x)):         d[str(i)] = x[i]     return d  #Now populate that df = rdd.map(lambda x: Row(**f(x))).toDF() 

This way you are going to be able to create a dataframe dynamically.

createDataFrame(rdd, schema)

Other way to do that is creating a dynamic schema. How?

This way:

from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql.types import StringType  schema = StructType([StructField(str(i), StringType(), True) for i in range(32)])  df = sqlContext.createDataFrame(rdd, schema) 

This second way is cleaner to do that...

So this is how you can create dataframes dynamically.

like image 62
Thiago Baldim Avatar answered Sep 26 '22 21:09

Thiago Baldim