I have an RDD in Spark where the objects are based on a case class:
ExampleCaseClass(user: User, stuff: Stuff)
I want to use Spark's ML pipeline, so I convert this to a Spark data frame. As part of the pipeline, I want to transform one of the columns into a column whose entries are vectors. Since I want the length of that vector to vary with the model, it should be built into the pipeline as part of the feature transformation.
So I attempted to define a Transformer as follows:
class MyTransformer extends Transformer {
val uid = ""
val num: IntParam = new IntParam(this, "", "")
def setNum(value: Int): this.type = set(num, value)
setDefault(num -> 50)
def transform(df: DataFrame): DataFrame = {
...
}
def transformSchema(schema: StructType): StructType = {
val inputFields = schema.fields
StructType(inputFields :+ StructField("colName", ???, true))
}
def copy (extra: ParamMap): Transformer = defaultCopy(extra)
}
How do I specify the DataType of the resulting field (i.e. fill in the ???)? It will be a Vector of some simple class (Boolean, Int, Double, etc). It seems VectorUDT might have worked, but that's private to Spark. Since any RDD can be converted to a DataFrame, any case class can be converted to a custom DataType. However I can't figure out how to manually do this conversion, otherwise I could apply it to some simple case class wrapping the vector.
Furthermore, if I specify a vector type for the column, will VectorAssembler correctly process the vector into separate features when I go to fit the model?
Still new to Spark and especially to the ML Pipeline, so appreciate any advice.
Column vectors are created using square brackets [ ], with semicolons or newlines to separate elements. A row vector may be converted into a column vector (and vice versa) using the transpose operator '.
To create a vector of data frame values by rows we can use c function after transposing the data frame with t. For example, if we have a data frame df that contains many columns then the df values can be transformed into a vector by using c(t(df)), this will print the values of the data frame row by row.
import org.apache.spark.ml.linalg.SQLDataTypes.VectorType
def transformSchema(schema: StructType): StructType = {
val inputFields = schema.fields
StructType(inputFields :+ StructField("colName", VectorType, true))
}
In spark 2.1 VectorType makes VectorUDT publicly available:
package org.apache.spark.ml.linalg
import org.apache.spark.annotation.{DeveloperApi, Since}
import org.apache.spark.sql.types.DataType
/**
* :: DeveloperApi ::
* SQL data types for vectors and matrices.
*/
@Since("2.0.0")
@DeveloperApi
object SQLDataTypes {
/** Data type for [[Vector]]. */
val VectorType: DataType = new VectorUDT
/** Data type for [[Matrix]]. */
val MatrixType: DataType = new MatrixUDT
}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
case class MyVector(vector: Vector)
val vectorDF = Seq(
MyVector(Vectors.dense(1.0,3.4,4.4)),
MyVector(Vectors.dense(5.5,6.7))
).toDF
vectorDF.printSchema
root
|-- vector: vector (nullable = true)
println(vectorDF.schema.fields(0).dataType.prettyJson)
{
"type" : "udt",
"class" : "org.apache.spark.mllib.linalg.VectorUDT",
"pyClass" : "pyspark.mllib.linalg.VectorUDT",
"sqlType" : {
"type" : "struct",
"fields" : [ {
"name" : "type",
"type" : "byte",
"nullable" : false,
"metadata" : { }
}, {
"name" : "size",
"type" : "integer",
"nullable" : true,
"metadata" : { }
}, {
"name" : "indices",
"type" : {
"type" : "array",
"elementType" : "integer",
"containsNull" : false
},
"nullable" : true,
"metadata" : { }
}, {
"name" : "values",
"type" : {
"type" : "array",
"elementType" : "double",
"containsNull" : false
},
"nullable" : true,
"metadata" : { }
} ]
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With