I want to convert dataframe from pandas to spark and I am using spark_context.createDataFrame()
method to create the dataframe. I'm also specifying the schema in the createDataFrame()
method.
What I want to know is how handle special cases. For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN". I am looking for ways how to get actual nulls instead of "NaN".
TL;DR Your best option for now is to skip Pandas completely.
The source of the problem is that Pandas are less expressive than Spark SQL. Spark provides both NULL
(in a SQL sense, as missing value) and NaN
(numeric Not a Number).
Pandas from the other handm doesn't have native value which can be used to represent missing values. As a result it uses placeholders like NaN
/ NaT
or Inf
, which are indistinguishable to Spark from actual NaNs
and Infs
and conversion rules depend on the column type. The only exception are object
columns (typically strings) which can contain None
values. You can learn more about handling missing values Pandas from the documentation.
For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN".
This is actually not correct. Depending on type of input column. If column shows NaN
it is most likely not a number value, not a plain string:
from pyspark.sql.functions import isnan, isnull
pdf = pd.DataFrame({
"x": [1, None], "y": [None, "foo"],
"z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)
sdf.show()
+---+----+-------------------+
| x| y| z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo| null|
+---+----+-------------------+
sdf.select([
f(c) for c in sdf.columns for f in [isnan, isnull]
if (f, c) != (isnan, "z") # isnan cannot be applied to timestamp
]).show()
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
| false| false| false| true| false|
| true| false| false| false| true|
+--------+-----------+--------+-----------+-----------+
In practice, parallelized local collections (including Pandas objects) have negligible importance beyond simple testing and toy examples so you can always convert data manually (skipping possible Arrow optimizations):
import numpy as np
spark.createDataFrame([
tuple(
None if isinstance(x, (float, int)) and np.isnan(x) else x
for x in record.tolist())
for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
+----+----+-------------------+
| x| y| z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo| null|
+----+----+-------------------+
If missing / not-a-number ambiguity is not an issue then just load data as usually and replace in Spark.
from pyspark.sql.functions import col, when
sdf.select([
when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c
for c, t in sdf.dtypes
]).show()
+----+----+-------------------+
| x| y| z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo| null|
+----+----+-------------------+
If you want to load a pandas df you can replace NaN with None:
import pandas as pd
def load_csv(spark, path):
"""read csv to spark df"""
pd_df = pd.read_csv(path)
pd_df = pd_df.where((pd.notnull(pd_df)), None)
df = spark.createDataFrame(pd_df)
return df
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With