I have a bizarre issue with PySpark when indexing column of strings in features. Here is my tmp.csv file:
x0,x1,x2,x3
asd2s,1e1e,1.1,0
asd2s,1e1e,0.1,0
,1e3e,1.2,0
bd34t,1e1e,5.1,1
asd2s,1e3e,0.2,0
bd34t,1e2e,4.3,1
where I have one missing value for 'x0'. At first, I'm reading features from csv file into DataFrame using pyspark_csv: https://github.com/seahboonsiew/pyspark-csv then indexing x0 with StringIndexer:
import pyspark_csv as pycsv
from pyspark.ml.feature import StringIndexer
sc.addPyFile('pyspark_csv.py')
features = pycsv.csvToDataFrame(sqlCtx, sc.textFile('tmp.csv'))
indexer = StringIndexer(inputCol='x0', outputCol='x0_idx' )
ind = indexer.fit(features).transform(features)
print ind.collect()
when calling ''ind.collect()'' Spark throws java.lang.NullPointerException. Everything works fine for complete data set, e.g., for 'x1' though.
Does anyone have a clue what is causing this and how to fix it?
Thanks in advance!
Sergey
Update:
I use Spark 1.5.1. The exact error:
File "/spark/spark-1.4.1-bin-hadoop2.6/python/pyspark/sql/dataframe.py", line 258, in show
print(self._jdf.showString(n))
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
File "/spark/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o444.showString.
: java.lang.NullPointerException
at org.apache.spark.sql.types.Metadata$.org$apache$spark$sql$types$Metadata$$hash(Metadata.scala:208)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
at org.apache.spark.sql.types.Metadata$$anonfun$org$apache$spark$sql$types$Metadata$$hash$2.apply(Metadata.scala:196)
... etc
I've tried to create the same DataFrame without reading csv file,
df = sqlContext.createDataFrame(
[('asd2s','1e1e',1.1,0), ('asd2s','1e1e',0.1,0),
(None,'1e3e',1.2,0), ('bd34t','1e1e',5.1,1),
('asd2s','1e3e',0.2,0), ('bd34t','1e2e',4.3,1)],
['x0','x1','x2','x3'])
and it gives the same error. A bit different example works fine,
df = sqlContext.createDataFrame(
[(0, None, 1.2), (1, '06330986ed', 2.3),
(2, 'b7584c2d52', 2.5), (3, None, .8),
(4, 'bd17e19b3a', None), (5, '51b5c0f2af', 0.1)],
['id', 'x0', 'num'])
// after indexing x0
+---+----------+----+------+
| id| x0| num|x0_idx|
+---+----------+----+------+
| 0| null| 1.2| 0.0|
| 1|06330986ed| 2.3| 2.0|
| 2|b7584c2d52| 2.5| 4.0|
| 3| null| 0.8| 0.0|
| 4|bd17e19b3a|null| 1.0|
| 5|51b5c0f2af| 0.1| 3.0|
+---+----------+----+------+
Update 2:
I've just discovered the same issue in Scala, so I guess it's Spark bug not PySpark only. In particular, data frame
val df = sqlContext.createDataFrame(
Seq(("asd2s","1e1e",1.1,0), ("asd2s","1e1e",0.1,0),
(null,"1e3e",1.2,0), ("bd34t","1e1e",5.1,1),
("asd2s","1e3e",0.2,0), ("bd34t","1e2e",4.3,1))
).toDF("x0","x1","x2","x3")
throws java.lang.NullPointerException when indexing 'x0' feature. Moreover, when indexing 'x0' in the following data frame
val df = sqlContext.createDataFrame(
Seq((0, null, 1.2), (1, "b", 2.3),
(2, "c", 2.5), (3, "a", 0.8),
(4, "a", null), (5, "c", 0.1))
).toDF("id", "x0", "num")
I've got 'java.lang.UnsupportedOperationException: Schema for type Any is not supported' which is caused by missing 'num' value in 5th vector. If one replaces it with a number everything works well even having missing value in the 1st vector.
I've also tried older versions of Spark (1.4.1), and the result is the same.
It looks like module you're using converts empty strings to nulls and it is messing at some point with downstream processing. At first glance it looks like a PySpark bug.
How to fix it? A simple workaround is to either drop nulls before indexing:
features.na.drop()
or replace nulls with some placeholder:
from pyspark.sql.functions import col, when
features.withColumn(
"x0", when(col("x0").isNull(), "__SOME_PLACEHOLDER__").otherwise(col("x0")))
Also, you could use spark-csv
. It is efficient, tested and as a bonus doesn't convert empty strings to nulls
.
features = (sqlContext.read
.format('com.databricks.spark.csv')
.option("inferSchema", "true")
.option("header", "true")
.load("tmp.csv"))
Well, currently, the only solution is to get rid of NA's like @zero323 proposed or to convert Spark DataFrame to Pandas DataFrame using toPandas() method and impute the data using sklearn Imputer or any custom imputer, e.g., Impute categorical missing values in scikit-learn, then convert Pandas Dataframe back to Spark DataFrame and work with it. Still, the issue remains, I'll try submit a bug report if any. I'm relatively new to Spark, so there is a chance I'm missing something.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With