Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Pandas dataframe to Spark dataframe, handling NaN conversions to actual null?

I want to convert dataframe from pandas to spark and I am using spark_context.createDataFrame() method to create the dataframe. I'm also specifying the schema in the createDataFrame() method.

What I want to know is how handle special cases. For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN". I am looking for ways how to get actual nulls instead of "NaN".

like image 800
Prashil Sureja Avatar asked Jul 19 '17 05:07

Prashil Sureja


2 Answers

TL;DR Your best option for now is to skip Pandas completely.

The source of the problem is that Pandas are less expressive than Spark SQL. Spark provides both NULL (in a SQL sense, as missing value) and NaN (numeric Not a Number).

Pandas from the other handm doesn't have native value which can be used to represent missing values. As a result it uses placeholders like NaN / NaT or Inf, which are indistinguishable to Spark from actual NaNs and Infs and conversion rules depend on the column type. The only exception are object columns (typically strings) which can contain None values. You can learn more about handling missing values Pandas from the documentation.

For example, NaN in pandas when converted to Spark dataframe ends up being string "NaN".

This is actually not correct. Depending on type of input column. If column shows NaN it is most likely not a number value, not a plain string:

from pyspark.sql.functions import isnan, isnull

pdf = pd.DataFrame({
    "x": [1, None], "y": [None, "foo"], 
    "z": [pd.Timestamp("20120101"), pd.Timestamp("NaT")]
})
sdf = spark.createDataFrame(pdf)

sdf.show()
+---+----+-------------------+
|  x|   y|                  z|
+---+----+-------------------+
|1.0|null|2012-01-01 00:00:00|
|NaN| foo|               null|
+---+----+-------------------+
sdf.select([
    f(c) for c in sdf.columns for f in [isnan, isnull] 
    if (f, c) != (isnan, "z")  # isnan cannot be applied to timestamp 
]).show()
+--------+-----------+--------+-----------+-----------+
|isnan(x)|(x IS NULL)|isnan(y)|(y IS NULL)|(z IS NULL)|
+--------+-----------+--------+-----------+-----------+
|   false|      false|   false|       true|      false|
|    true|      false|   false|      false|       true|
+--------+-----------+--------+-----------+-----------+

In practice, parallelized local collections (including Pandas objects) have negligible importance beyond simple testing and toy examples so you can always convert data manually (skipping possible Arrow optimizations):

import numpy as np

spark.createDataFrame([
   tuple(
        None if isinstance(x, (float, int)) and np.isnan(x) else x
        for x in record.tolist())
   for record in pdf.to_records(index=False)
], pdf.columns.tolist()).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|1325376000000000000|
|null| foo|               null|
+----+----+-------------------+

If missing / not-a-number ambiguity is not an issue then just load data as usually and replace in Spark.

from pyspark.sql.functions import col, when 

sdf.select([
    when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c 
    for c, t in sdf.dtypes
]).show()
+----+----+-------------------+
|   x|   y|                  z|
+----+----+-------------------+
| 1.0|null|2012-01-01 00:00:00|
|null| foo|               null|
+----+----+-------------------+
like image 78
zero323 Avatar answered Nov 16 '22 20:11

zero323


If you want to load a pandas df you can replace NaN with None:

import pandas as pd
def load_csv(spark, path):
    """read csv to spark df"""
    pd_df = pd.read_csv(path)
    pd_df = pd_df.where((pd.notnull(pd_df)), None)
    df = spark.createDataFrame(pd_df)
    return df
like image 28
justin cress Avatar answered Nov 16 '22 19:11

justin cress