Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to specify a missing value in a dataframe

I am trying to load a CSV file into a Spark data frame with spark-csv [1] using an Apache Zeppelin notebook and when loading a numeric field that doesn't have value the parser fails for that line and the line gets skipped.

I would have expected the line to get loaded and the value in the data frame load the line and have the value set to NULL so that aggregations just ignore the value.

%dep
z.reset()
z.addRepo("my-nexus").url("<my_local_nexus_repo_that_is_a_proxy_of_public_repos>")
z.load("com.databricks:spark-csv_2.10:1.1.0")


%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data.csv")

df.describe("height").show()

Here is the content of the data file: /home/spark_user/data.csv

identifier,name,height
1,sam,184
2,cath,180
3,santa,     <-- note that there is not height recorded for Santa !

Here is the output:

+-------+------+
|summary|height|
+-------+------+
|  count|     2|    <- 2 of 3 lines loaded, ie. sam and cath
|   mean| 182.0|
| stddev|   2.0|
|    min| 180.0|
|    max| 184.0|
+-------+------+

In the logs of zeppelin I can see the following error on parsing santa's line:

ERROR [2015-07-21 16:42:09,940] ({Executor task launch worker-45} CsvRelation.scala[apply]:209) - Exception while parsing line: 3,santa,.
        java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1842)
        at sun.misc.FloatingDecimal.parseDouble(FloatingDecimal.java:110)
        at java.lang.Double.parseDouble(Double.java:538)
        at scala.collection.immutable.StringLike$class.toDouble(StringLike.scala:232)
        at scala.collection.immutable.StringOps.toDouble(StringOps.scala:31)
        at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:42)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:198)
        at com.databricks.spark.csv.CsvRelation$$anonfun$com$databricks$spark$csv$CsvRelation$$parseCSV$1.apply(CsvRelation.scala:180)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:129)
        at org.apache.spark.sql.execution.Aggregate$$anonfun$doExecute$1$$anonfun$6.apply(Aggregate.scala:126)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

So you might tell me so far so good ... and you'd be right ;)

Now I want to add an extra column, say age and I always have data in that field.

identifier,name,height,age
1,sam,184,30
2,cath,180,32
3,santa,,70

Now ask politely for some stats about age:

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
    StructField("identifier", StringType, true) ::
    StructField("name", StringType, true) ::
    StructField("height", DoubleType, true) :: 
    StructField("age", DoubleType, true) :: 
    Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv")
                        .schema(schema)
                        .option("header", "true")
                        .load("file:///home/spark_user/data2.csv")

df.describe("age").show()

Results

+-------+----+
|summary| age|
+-------+----+
|  count|   2|
|   mean|31.0|
| stddev| 1.0|
|    min|30.0|
|    max|32.0|
+-------+----+

ALL WRONG ! Since santa's height is not known, the whole line is lost and the calculation of age is only based on Sam and Cath while Santa has a perfectly valid age.

My question is what value do I need to plug in Santa's height so that the CSV can be loaded. I have tried to set the schema to be all StringType but then

The next question is more about

I have found in the API that one can handle N/A values using spark. SO I thought maybe I could load my data with all columns set to StringType and then do some cleanup and then only set the schema properly as written below:

%spark
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types._
import com.databricks.spark.csv._
import org.apache.spark.sql.functions._

val schema = StructType(
StructField("identifier", StringType, true) ::
StructField("name", StringType, true) ::
StructField("height", StringType, true) ::
StructField("age", StringType, true) ::
Nil)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.format("com.databricks.spark.csv").schema(schema).option("header", "true").load("file:///home/spark_user/data.csv")

// eg. for each column of my dataframe, replace empty string by null
df.na.replace( "*", Map("" -> null) )

val toDouble = udf[Double, String]( _.toDouble)
df2 = df.withColumn("age", toDouble(df("age")))

df2.describe("age").show()

But df.na.replace() throws an exception and stops:

java.lang.IllegalArgumentException: Unsupported value type java.lang.String ().
        at org.apache.spark.sql.DataFrameNaFunctions.org$apache$spark$sql$DataFrameNaFunctions$$convertToDouble(DataFrameNaFunctions.scala:417)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions$$anonfun$4.apply(DataFrameNaFunctions.scala:337)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.sql.DataFrameNaFunctions.replace0(DataFrameNaFunctions.scala:337)
        at org.apache.spark.sql.DataFrameNaFunctions.replace(DataFrameNaFunctions.scala:304)

Any help, & tips much appreciated !!

[1] https://github.com/databricks/spark-csv

like image 965
Samuel Kerrien Avatar asked Jul 21 '15 15:07

Samuel Kerrien


1 Answers

Spark-csv lacks this option. It has been fixed in master branch. I guess you should use it or wait for the next stable version.

like image 99
krcz Avatar answered Oct 23 '22 04:10

krcz