Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark DataFrame not respecting schema and considering everything as String

I am facing a problem which I have failed to get over for ages now.

  1. I am on Spark 1.4 and Scala 2.10. I cannot upgrade at this moment (big distributed infrastructure)

  2. I have a file with few hundred columns, only 2 of which are string and rest all are Long. I want to convert this data into a Label/Features dataframe.

  3. I have been able to get it into LibSVM format.

  4. I just cannot get it into a Label/Features format.

The reason being

  1. I am not being able to use the toDF() as shown here https://spark.apache.org/docs/1.5.1/ml-ensembles.html

    val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
    

    it it not supported in 1.4

  2. So I first converted the txtFile into a DataFrame where I used something like this

    def getColumnDType(columnName:String):StructField = {
    
            if((columnName== "strcol1") || (columnName== "strcol2")) 
                return StructField(columnName, StringType, false)
            else
                return StructField(columnName, LongType, false)
        }
    def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
            val sfRDD = sc.textFile(staticfeatures_filepath)//
            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
             // reads a space delimited string from application.properties file
            val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
    
            // Generate the schema based on the string of schema
            val schema =
              StructType(
                schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
    
            val data = sfRDD
            .map(line => line.split(","))
            .map(p => Row.fromSeq(p.toSeq))
    
            var df = sqlContext.createDataFrame(data, schema)
    
            //schemaString.split(" ").drop(4)
            //.map(s => df = convertColumn(df, s, "int"))
    
            return df
        }   
    

When I do a df.na.drop() df.printSchema() with this returned dataframe I get perfect Schema Like this

root
 |-- rand_entry: long (nullable = false)
 |-- strcol1: string (nullable = false)
 |-- label: long (nullable = false)
 |-- strcol2: string (nullable = false)
 |-- f1: long (nullable = false)
 |-- f2: long (nullable = false)
 |-- f3: long (nullable = false)
and so on till around f300

But - the sad part is whatever I try to do (see below) with the df, I am always getting a ClassCastException (java.lang.String cannot be cast to java.lang.Long)

val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
        .setInputCols(featureColumns)
        .setOutputCol("features")
        .transform(df)

So - the bad is - even though the schema says Long - the df is still internally considering everything as String.

Edit

Adding a simple example

Say I have a file like this

1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759

and

sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11

(column names really do not matter so you can disregard this details)

All I am trying to do is create a Label/Features kind of dataframe where my 3rd column becomes a label and the 5th to 11th columns become a feature [Vector] column. Such that I can follow the rest of the steps in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.

I have cast the columns too like suggested by zero323

val types = Map("strCol1" -> "string", "strCol2" -> "string")
        .withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")

But the assert and VectorAssembler still fails. featureColumns = Array("f2","f3",....."f11") This is whole sequence I want to do after I have my df

    var transformeddf = new VectorAssembler()
    .setInputCols(featureColumns)
    .setOutputCol("features")
    .transform(df)

    transformeddf.show(2)

    transformeddf = new StringIndexer()
    .setInputCol("f1")
    .setOutputCol("indexedF1")
    .fit(transformeddf)
    .transform(transformeddf)

    transformeddf.show(2)

    transformeddf = new VectorIndexer()
    .setInputCol("features")
    .setOutputCol("indexedFeatures")
    .setMaxCategories(5)
    .fit(transformeddf)
    .transform(transformeddf)

The exception trace from ScalaIDE - just when it hits the VectorAssembler is as below

java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
    at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
    at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
    at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
    at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
    at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
    at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
    at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
like image 610
Run2 Avatar asked Mar 14 '16 14:03

Run2


1 Answers

You get ClassCastException because this is exactly what should happen. Schema argument is not used for automatic casting (some DataSources may use schema this way, but not methods like createDataFrame). It only declares what are the types of the values which are stored in the rows. It is you responsibility to pass data which matches the schema, not the other way around.

While DataFrame shows schema you've declared it is validated only on runtime, hence the runtime exception.If you want to transform data to specific you have cast data explicitly.

  1. First read all columns as StringType:

    val rows = sc.textFile(staticfeatures_filepath)
      .map(line => Row.fromSeq(line.split(",")))
    
    val schema = StructType(
      schemaString.split(" ").map(
        columnName => StructField(columnName, StringType, false)
      )
    )
    
    val df = sqlContext.createDataFrame(rows, schema)
    
  2. Next cast selected columns to desired type:

    import org.apache.spark.sql.types.{LongType, StringType}
    
    val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
      .withDefault(_ => LongType)
    
    val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
    
  3. Use assembler:

    val transformeddf = new VectorAssembler()
      .setInputCols(featureColumns)
      .setOutputCol("features")
      .transform(casted)
    

You can simply steps 1 and 2 using spark-csv:

// As originally 
val schema = StructType(
  schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))


val df = sqlContext
  .read.schema(schema)
  .format("com.databricks.spark.csv")
  .option("header", "false")
  .load(staticfeatures_filepath)

See also Correctly reading the types from file in PySpark

like image 82
zero323 Avatar answered Oct 19 '22 00:10

zero323