Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting Pandas dataframe into Spark dataframe error

I'm trying to convert Pandas DF into Spark one. DF head:

10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543 10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611 10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691 

Code:

dataset = pd.read_csv("data/AS/test_v2.csv") sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) sdf = sqlCtx.createDataFrame(dataset) 

And I got an error:

TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'> 
like image 907
Иван Судос Avatar asked May 29 '16 18:05

Иван Судос


People also ask

Can Pandas DataFrame convert to Spark DataFrame?

Spark provides a createDataFrame(pandas_dataframe) method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types. If you want all data types to String use spark. createDataFrame(pandasDF. astype(str)) .

Can we use Pandas DataFrame in PySpark?

pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame. See Default Index Type. In order to avoid this overhead, specify the column to use as an index when possible.

How do I change Pandas code to PySpark?

The easiest way to convert Pandas DataFrames to PySpark is through Apache Arrow. Apache Arrow is a language-independent, in-memory columnar format that can be used to optimize the conversion between Spark and Pandas DataFrames when using toPandas() or createDataFrame().

Why does toPandas take so long?

Looking at the source code for toPandas() , one reason it may be slow is because it first creates the pandas DataFrame , and then copies each of the Series in that DataFrame over to the returned DataFrame .


2 Answers

I made this script, It worked for my 10 pandas Data frames

from pyspark.sql.types import *  # Auxiliar functions def equivalent_type(f):     if f == 'datetime64[ns]': return TimestampType()     elif f == 'int64': return LongType()     elif f == 'int32': return IntegerType()     elif f == 'float64': return FloatType()     else: return StringType()  def define_structure(string, format_type):     try: typo = equivalent_type(format_type)     except: typo = StringType()     return StructField(string, typo)  # Given pandas dataframe, it will return a spark's dataframe. def pandas_to_spark(pandas_df):     columns = list(pandas_df.columns)     types = list(pandas_df.dtypes)     struct_list = []     for column, typo in zip(columns, types):        struct_list.append(define_structure(column, typo))     p_schema = StructType(struct_list)     return sqlContext.createDataFrame(pandas_df, p_schema) 

You can see it also in this gist

With this you just have to call spark_df = pandas_to_spark(pandas_df)

like image 146
Gonzalo Garcia Avatar answered Sep 23 '22 13:09

Gonzalo Garcia


Type related errors can be avoided by imposing a schema as follows:

note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").

import pyspark from pyspark.sql import SparkSession import pandas as pd  spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate()  pdDF = pd.read_csv("test.csv") 

contents of the pandas data frame:

       col1     col2    col3    col4    col5    col6    col7    col8   ...  0      10000001 1       0       1       12:35   OK      10002   1      ... 1      10000001 2       0       1       12:36   OK      10002   1      ... 2      10000002 1       0       4       12:19   PA      10003   1      ... 

Next, create the schema:

from pyspark.sql.types import *  mySchema = StructType([ StructField("col1", LongType(), True)\                        ,StructField("col2", IntegerType(), True)\                        ,StructField("col3", IntegerType(), True)\                        ,StructField("col4", IntegerType(), True)\                        ,StructField("col5", StringType(), True)\                        ,StructField("col6", StringType(), True)\                        ,StructField("col7", IntegerType(), True)\                        ,StructField("col8", IntegerType(), True)\                        ,StructField("col9", IntegerType(), True)\                        ,StructField("col10", IntegerType(), True)\                        ,StructField("col11", StringType(), True)\                        ,StructField("col12", StringType(), True)\                        ,StructField("col13", IntegerType(), True)\                        ,StructField("col14", IntegerType(), True)\                        ,StructField("col15", IntegerType(), True)\                        ,StructField("col16", IntegerType(), True)\                        ,StructField("col17", IntegerType(), True)\                        ,StructField("col18", IntegerType(), True)\                        ,StructField("col19", IntegerType(), True)\                        ,StructField("col20", IntegerType(), True)\                        ,StructField("col21", IntegerType(), True)\                        ,StructField("col22", IntegerType(), True)\                        ,StructField("col23", IntegerType(), True)\                        ,StructField("col24", IntegerType(), True)\                        ,StructField("col25", IntegerType(), True)]) 

Note: True (implies nullable allowed)

create the pyspark dataframe:

df = spark.createDataFrame(pdDF,schema=mySchema) 

confirm the pandas data frame is now a pyspark data frame:

type(df) 

output:

pyspark.sql.dataframe.DataFrame 

Aside:

To address Kate's comment below - to impose a general (String) schema you can do the following:

df=spark.createDataFrame(pdDF.astype(str))  
like image 31
Grant Shannon Avatar answered Sep 22 '22 13:09

Grant Shannon