I'm trying to convert Pandas DF into Spark one. DF head:
10000001,1,0,1,12:35,OK,10002,1,0,9,f,NA,24,24,0,3,9,0,0,1,1,0,0,4,543 10000001,2,0,1,12:36,OK,10002,1,0,9,f,NA,24,24,0,3,9,2,1,1,3,1,3,2,611 10000002,1,0,4,12:19,PA,10003,1,1,7,f,NA,74,74,0,2,15,2,0,2,3,1,2,2,691
Code:
dataset = pd.read_csv("data/AS/test_v2.csv") sc = SparkContext(conf=conf) sqlCtx = SQLContext(sc) sdf = sqlCtx.createDataFrame(dataset)
And I got an error:
TypeError: Can not merge type <class 'pyspark.sql.types.StringType'> and <class 'pyspark.sql.types.DoubleType'>
Spark provides a createDataFrame(pandas_dataframe) method to convert pandas to Spark DataFrame, Spark by default infers the schema based on the pandas data types to PySpark data types. If you want all data types to String use spark. createDataFrame(pandasDF. astype(str)) .
pandas-on-Spark DataFrame and Spark DataFrame are virtually interchangeable. However, note that a new default index is created when pandas-on-Spark DataFrame is created from Spark DataFrame. See Default Index Type. In order to avoid this overhead, specify the column to use as an index when possible.
The easiest way to convert Pandas DataFrames to PySpark is through Apache Arrow. Apache Arrow is a language-independent, in-memory columnar format that can be used to optimize the conversion between Spark and Pandas DataFrames when using toPandas() or createDataFrame().
Looking at the source code for toPandas() , one reason it may be slow is because it first creates the pandas DataFrame , and then copies each of the Series in that DataFrame over to the returned DataFrame .
I made this script, It worked for my 10 pandas Data frames
from pyspark.sql.types import * # Auxiliar functions def equivalent_type(f): if f == 'datetime64[ns]': return TimestampType() elif f == 'int64': return LongType() elif f == 'int32': return IntegerType() elif f == 'float64': return FloatType() else: return StringType() def define_structure(string, format_type): try: typo = equivalent_type(format_type) except: typo = StringType() return StructField(string, typo) # Given pandas dataframe, it will return a spark's dataframe. def pandas_to_spark(pandas_df): columns = list(pandas_df.columns) types = list(pandas_df.dtypes) struct_list = [] for column, typo in zip(columns, types): struct_list.append(define_structure(column, typo)) p_schema = StructType(struct_list) return sqlContext.createDataFrame(pandas_df, p_schema)
You can see it also in this gist
With this you just have to call spark_df = pandas_to_spark(pandas_df)
Type related errors can be avoided by imposing a schema as follows:
note: a text file was created (test.csv) with the original data (as above) and hypothetical column names were inserted ("col1","col2",...,"col25").
import pyspark from pyspark.sql import SparkSession import pandas as pd spark = SparkSession.builder.appName('pandasToSparkDF').getOrCreate() pdDF = pd.read_csv("test.csv")
contents of the pandas data frame:
col1 col2 col3 col4 col5 col6 col7 col8 ... 0 10000001 1 0 1 12:35 OK 10002 1 ... 1 10000001 2 0 1 12:36 OK 10002 1 ... 2 10000002 1 0 4 12:19 PA 10003 1 ...
Next, create the schema:
from pyspark.sql.types import * mySchema = StructType([ StructField("col1", LongType(), True)\ ,StructField("col2", IntegerType(), True)\ ,StructField("col3", IntegerType(), True)\ ,StructField("col4", IntegerType(), True)\ ,StructField("col5", StringType(), True)\ ,StructField("col6", StringType(), True)\ ,StructField("col7", IntegerType(), True)\ ,StructField("col8", IntegerType(), True)\ ,StructField("col9", IntegerType(), True)\ ,StructField("col10", IntegerType(), True)\ ,StructField("col11", StringType(), True)\ ,StructField("col12", StringType(), True)\ ,StructField("col13", IntegerType(), True)\ ,StructField("col14", IntegerType(), True)\ ,StructField("col15", IntegerType(), True)\ ,StructField("col16", IntegerType(), True)\ ,StructField("col17", IntegerType(), True)\ ,StructField("col18", IntegerType(), True)\ ,StructField("col19", IntegerType(), True)\ ,StructField("col20", IntegerType(), True)\ ,StructField("col21", IntegerType(), True)\ ,StructField("col22", IntegerType(), True)\ ,StructField("col23", IntegerType(), True)\ ,StructField("col24", IntegerType(), True)\ ,StructField("col25", IntegerType(), True)])
Note: True
(implies nullable allowed)
create the pyspark dataframe:
df = spark.createDataFrame(pdDF,schema=mySchema)
confirm the pandas data frame is now a pyspark data frame:
type(df)
output:
pyspark.sql.dataframe.DataFrame
Aside:
To address Kate's comment below - to impose a general (String) schema you can do the following:
df=spark.createDataFrame(pdDF.astype(str))
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With