I have csv data and created Pandas dataframe using read_csv and forcing all columns as string. Then when I try to create Spark dataframe from the Pandas dataframe, I get the error message below.
from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * z=pd.read_csv("mydata.csv", dtype=str) z.info()
<class 'pandas.core.frame.DataFrame'> Int64Index: 74044003 entries, 0 to 74044002 Data columns (total 12 columns): primaryid object event_dt object age object age_cod object age_grp object sex object occr_country object drug_seq object drugname object route object outc_cod object pt object
q= sqlContext.createDataFrame(z)
File "<stdin>", line 1, in <module> File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 425, in createDataFrame rdd, schema = self._createFromLocal(data, schema) File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 341, in _createFromLocal struct = self._inferSchemaFromList(data) File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/context.py", line 241, in _inferSchemaFromList schema = reduce(_merge_type, map(_infer_schema, data)) File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 862, in _merge_type for f in a.fields] File "/usr/hdp/2.4.2.0-258/spark/python/pyspark/sql/types.py", line 856, in _merge_type raise TypeError("Can not merge type %s and %s" % (type(a), type(b))) TypeError: Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>
Here is an example. I am downloading public data and creating pandas dataframe but spark does not create spark dataframe from the pandas dataframe.
import pandas as pd from pyspark import SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * url ="http://www.nber.org/fda/faers/2016/demo2016q1.csv.zip" import requests, zipfile, StringIO r = requests.get(url, stream=True) z = zipfile.ZipFile(StringIO.StringIO(r.content)) z.extractall() z=pd.read_csv("demo2016q1.csv") # creates pandas dataframe Data_Frame = sqlContext.createDataFrame(z)
Long story short don't depend on schema inference. It is expensive and tricky in general. In particular some columns (for example event_dt_num
) in your data have missing values which pushes Pandas to represent them as mixed types (string for not missing, NaN for missing values).
If you're in doubt it is better to read all data as strings and cast afterwards. If you have access to code book you should always provide schema to avoid problems and reduce overall cost.
Finally passing data from the driver is anti-pattern. You should be able to read this data directly using csv
format (Spark 2.0.0+) or spark-csv
library (Spark 1.6 and below):
df = (spark.read.format("csv").options(header="true") .load("/path/tp/demo2016q1.csv")) ## root ## |-- primaryid: string (nullable = true) ## |-- caseid: string (nullable = true) ## |-- caseversion: string (nullable = true) ## |-- i_f_code: string (nullable = true) ## |-- i_f_code_num: string (nullable = true) ## ... ## |-- to_mfr: string (nullable = true) ## |-- occp_cod: string (nullable = true) ## |-- reporter_country: string (nullable = true) ## |-- occr_country: string (nullable = true) ## |-- occp_cod_num: string (nullable = true)
In this particular case adding inferSchema="true"
option should work as well but it is still better to avoid it. You can also provide schema as follows:
from pyspark.sql.types import StructType schema = StructType.fromJson({'fields': [{'metadata': {}, 'name': 'primaryid', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'caseid', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'caseversion', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'i_f_code', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'i_f_code_num', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'event_dt', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'event_dt_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'mfr_dt', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'mfr_dt_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'init_fda_dt', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'init_fda_dt_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'fda_dt', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'fda_dt_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'rept_cod', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'rept_cod_num', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'auth_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'mfr_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'mfr_sndr', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'lit_ref', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'age', 'nullable': True, 'type': 'double'}, {'metadata': {}, 'name': 'age_cod', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'age_grp', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'age_grp_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'sex', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'e_sub', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'wt', 'nullable': True, 'type': 'double'}, {'metadata': {}, 'name': 'wt_cod', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'rept_dt', 'nullable': True, 'type': 'integer'}, {'metadata': {}, 'name': 'rept_dt_num', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'to_mfr', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'occp_cod', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'reporter_country', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'occr_country', 'nullable': True, 'type': 'string'}, {'metadata': {}, 'name': 'occp_cod_num', 'nullable': True, 'type': 'integer'}], 'type': 'struct'})
directly to the reader:
(spark.read.schema(schema).format("csv").options(header="true") .load("/path/to/demo2016q1.csv"))
You could also try to
df["column"].iloc[np.where(df["column"].isna() == True[0]] = "Nan values"
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With