I am facing a problem which I have failed to get over for ages now.
I am on Spark 1.4 and Scala 2.10. I cannot upgrade at this moment (big distributed infrastructure)
I have a file with few hundred columns, only 2 of which are string and rest all are Long. I want to convert this data into a Label/Features dataframe.
I have been able to get it into LibSVM format.
I just cannot get it into a Label/Features format.
The reason being
I am not being able to use the toDF() as shown here https://spark.apache.org/docs/1.5.1/ml-ensembles.html
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()
it it not supported in 1.4
So I first converted the txtFile into a DataFrame where I used something like this
def getColumnDType(columnName:String):StructField = {
if((columnName== "strcol1") || (columnName== "strcol2"))
return StructField(columnName, StringType, false)
else
return StructField(columnName, LongType, false)
}
def getDataFrameFromTxtFile(sc: SparkContext,staticfeatures_filepath: String,schemaConf: String) : DataFrame = {
val sfRDD = sc.textFile(staticfeatures_filepath)//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// reads a space delimited string from application.properties file
val schemaString = readConf(Array(schemaConf)).get(schemaConf).getOrElse("")
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val data = sfRDD
.map(line => line.split(","))
.map(p => Row.fromSeq(p.toSeq))
var df = sqlContext.createDataFrame(data, schema)
//schemaString.split(" ").drop(4)
//.map(s => df = convertColumn(df, s, "int"))
return df
}
When I do a df.na.drop() df.printSchema()
with this returned dataframe I get perfect Schema Like this
root
|-- rand_entry: long (nullable = false)
|-- strcol1: string (nullable = false)
|-- label: long (nullable = false)
|-- strcol2: string (nullable = false)
|-- f1: long (nullable = false)
|-- f2: long (nullable = false)
|-- f3: long (nullable = false)
and so on till around f300
But - the sad part is whatever I try to do (see below) with the df, I am always getting a ClassCastException (java.lang.String cannot be cast to java.lang.Long)
val featureColumns = Array("f1","f2",....."f300")
assertEquals(-99,df.select("f1").head().getLong(0))
assertEquals(-99,df.first().get(4))
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
So - the bad is - even though the schema says Long - the df is still internally considering everything as String.
Edit
Adding a simple example
Say I have a file like this
1,A,20,P,-99,1,0,0,8,1,1,1,1,131153
1,B,23,P,-99,0,1,0,7,1,1,0,1,65543
1,C,24,P,-99,0,1,0,9,1,1,1,1,262149
1,D,7,P,-99,0,0,0,8,1,1,1,1,458759
and
sf-schema=f0 strCol1 f1 strCol2 f2 f3 f4 f5 f6 f7 f8 f9 f10 f11
(column names really do not matter so you can disregard this details)
All I am trying to do is create a Label/Features kind of dataframe where my 3rd column becomes a label and the 5th to 11th columns become a feature [Vector] column. Such that I can follow the rest of the steps in https://spark.apache.org/docs/latest/ml-classification-regression.html#tree-ensembles.
I have cast the columns too like suggested by zero323
val types = Map("strCol1" -> "string", "strCol2" -> "string")
.withDefault(_ => "bigint")
df = df.select(df.columns.map(c => df.col(c).cast(types(c)).alias(c)): _*)
df = df.drop("f0")
df = df.drop("strCol1")
df = df.drop("strCol2")
But the assert and VectorAssembler still fails. featureColumns = Array("f2","f3",....."f11") This is whole sequence I want to do after I have my df
var transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(df)
transformeddf.show(2)
transformeddf = new StringIndexer()
.setInputCol("f1")
.setOutputCol("indexedF1")
.fit(transformeddf)
.transform(transformeddf)
transformeddf.show(2)
transformeddf = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(5)
.fit(transformeddf)
.transform(transformeddf)
The exception trace from ScalaIDE - just when it hits the VectorAssembler is as below
java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Long
at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:110)
at scala.math.Numeric$LongIsIntegral$.toDouble(Numeric.scala:117)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToDouble$5.apply(Cast.scala:364)
at org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:436)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct$$anonfun$eval$2.apply(complexTypes.scala:75)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:75)
at org.apache.spark.sql.catalyst.expressions.CreateStruct.eval(complexTypes.scala:56)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:72)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf$$anonfun$2.apply(ScalaUdf.scala:70)
at org.apache.spark.sql.catalyst.expressions.ScalaUdf.eval(ScalaUdf.scala:960)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:118)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:68)
at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:52)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:312)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:143)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
You get ClassCastException
because this is exactly what should happen. Schema argument is not used for automatic casting (some DataSources
may use schema this way, but not methods like createDataFrame
). It only declares what are the types of the values which are stored in the rows. It is you responsibility to pass data which matches the schema, not the other way around.
While DataFrame
shows schema you've declared it is validated only on runtime, hence the runtime exception.If you want to transform data to specific you have cast
data explicitly.
First read all columns as StringType
:
val rows = sc.textFile(staticfeatures_filepath)
.map(line => Row.fromSeq(line.split(",")))
val schema = StructType(
schemaString.split(" ").map(
columnName => StructField(columnName, StringType, false)
)
)
val df = sqlContext.createDataFrame(rows, schema)
Next cast selected columns to desired type:
import org.apache.spark.sql.types.{LongType, StringType}
val types = Map("strcol1" -> StringType, "strcol2" -> StringType)
.withDefault(_ => LongType)
val casted = df.select(df.columns.map(c => col(c).cast(types(c)).alias(c)): _*)
Use assembler:
val transformeddf = new VectorAssembler()
.setInputCols(featureColumns)
.setOutputCol("features")
.transform(casted)
You can simply steps 1 and 2 using spark-csv
:
// As originally
val schema = StructType(
schemaString.split(" ").map(fieldName => getSFColumnDType(fieldName)))
val df = sqlContext
.read.schema(schema)
.format("com.databricks.spark.csv")
.option("header", "false")
.load(staticfeatures_filepath)
See also Correctly reading the types from file in PySpark
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With