Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas dataframe to Spark dataframe "Can not merge type error"

I have csv data and created Pandas dataframe using read_csv and forcing all columns as string. Then when I try to create Spark dataframe from the Pandas dataframe, I get the error message below.

from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * z=pd.read_csv("mydata.csv", dtype=str) z.info() 
<class 'pandas.core.frame.DataFrame'> Int64Index: 74044003 entries, 0 to 74044002 Data columns (total 12 columns): primaryid       object event_dt        object age             object age_cod         object age_grp         object sex             object occr_country    object drug_seq        object drugname        object route           object outc_cod        object pt              object 
q= sqlContext.createDataFrame(z) 
File "<stdin>", line 1, in <module> File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame rdd, schema = self._createFromLocal(data, schema)  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal struct = self._inferSchemaFromList(data)  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList schema = reduce(_merge_type, map(_infer_schema, data))  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type for f in a.fields]  File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'> 

Here is an example. I am downloading public data and creating pandas dataframe but spark does not create spark dataframe from the pandas dataframe.

import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import *  url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip"  import requests, zipfile, StringIO r = requests.get(url, stream=True) z = zipfile.ZipFile(StringIO.StringIO(r.content)) z.extractall()   z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe  Data_Frame = sqlContext.createDataFrame(z) 
like image 659
Fisseha Berhane Avatar asked Aug 05 '16 17:08

Fisseha Berhane


2 Answers

Long story short don't depend on schema inference. It is expensive and tricky in general. In particular some columns (for example event_dt_num) in your data have missing values which pushes Pandas to represent them as mixed types (string for not missing, NaN for missing values).

If you're in doubt it is better to read all data as strings and cast afterwards. If you have access to code book you should always provide schema to avoid problems and reduce overall cost.

Finally passing data from the driver is anti-pattern. You should be able to read this data directly using csv format (Spark 2.0.0+) or spark-csv library (Spark 1.6 and below):

df = (spark.read.format("csv").options(header="true")     .load("/path/tp/demo2016q1.csv"))  ## root ##  |-- primaryid: string (nullable = true) ##  |-- caseid: string (nullable = true) ##  |-- caseversion: string (nullable = true) ##  |-- i_f_code: string (nullable = true) ##  |-- i_f_code_num: string (nullable = true) ##   ... ##  |-- to_mfr: string (nullable = true) ##  |-- occp_cod: string (nullable = true) ##  |-- reporter_country: string (nullable = true) ##  |-- occr_country: string (nullable = true) ##  |-- occp_cod_num: string (nullable = true) 

In this particular case adding inferSchema="true" option should work as well but it is still better to avoid it. You can also provide schema as follows:

from pyspark.sql.types import StructType  schema = StructType.fromJson({'fields': [{'metadata': {},    'name': 'primaryid',    'nullable': True,    'type': 'integer'},   {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'},   {'metadata': {},    'name': 'i_f_code_num',    'nullable': True,    'type': 'integer'},   {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'},   {'metadata': {},    'name': 'init_fda_dt_num',    'nullable': True,    'type': 'string'},   {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'},   {'metadata': {},    'name': 'rept_cod_num',    'nullable': True,    'type': 'integer'},   {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'},   {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'},   {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'},   {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'},   {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'},   {'metadata': {},    'name': 'reporter_country',    'nullable': True,    'type': 'string'},   {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'},   {'metadata': {},    'name': 'occp_cod_num',    'nullable': True,    'type': 'integer'}],  'type': 'struct'}) 

directly to the reader:

(spark.read.schema(schema).format("csv").options(header="true")     .load("/path/to/demo2016q1.csv")) 
like image 161
zero323 Avatar answered Oct 01 '22 20:10

zero323


You could also try to

  1. import your data as a pandas dataframe
  2. replace the Nans for a string
  3. try now to change the pandas df into spark df
df["column"].iloc[np.where(df["column"].isna() == True[0]] = "Nan values" 
like image 35
Miguel Velasco Postigo Avatar answered Oct 01 '22 20:10

Miguel Velasco Postigo