Context: I have a data frame with two columns: label, and features.
org.apache.spark.sql.DataFrame = [label: int, features: vector]
Where features is a mllib.linalg.VectorUDT of numeric type built using VectorAssembler.
Question: Is there a way to assign a schema to the features vector? I want to keep track of the name of each feature.
Tried so far:
val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("feat1", "feat2", "feat3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])
scala> attrGroup.toMetadata
res197: org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"f1"},{"idx":1,"name":"f2"},{"idx":2,"name":"f3"}]},"num_attrs":3}}
But was not sure how to apply this to an existing data frame.
Aggregations are generally used to get the summary of the data. You can count, add and also find the product of the data. Using Spark, you can aggregate any kind of value into a set, list, etc. We will see this in “Aggregating to Complex Types”.
In PySpark, the withColumn() function is widely used and defined as the transformation function of the DataFrame which is further used to change the value, convert the datatype of an existing column, create the new column etc.
Using csv("path") or format("csv"). load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument.
There at lest two options:
On existing DataFrame
you can use as
method with metadata
argument:
import org.apache.spark.ml.attribute._
val rdd = sc.parallelize(Seq(
(1, Vectors.dense(1.0, 2.0, 3.0))
))
val df = rdd.toDF("label", "features")
df.withColumn("features", $"features".as("_", attrGroup.toMetadata))
When you create new DataFrame
convert AttributeGroup
toStructField
and use it as a schema for a given column:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
val schema = StructType(Array(
StructField("label", IntegerType, false),
attrGroup.toStructField()
))
spark.createDataFrame(
rdd.map(row => Row.fromSeq(row.productIterator.toSeq)),
schema)
If vector column has been created using VectorAssembler
column metadata describing parent columns should be already attached.
import org.apache.spark.ml.feature.VectorAssembler
val raw = sc.parallelize(Seq(
(1, 1.0, 2.0, 3.0)
)).toDF("id", "feat1", "feat2", "feat3")
val assembler = new VectorAssembler()
.setInputCols(Array("feat1", "feat2", "feat3"))
.setOutputCol("features")
val dfWithMeta = assembler.transform(raw).select($"id", $"features")
dfWithMeta.schema.fields(1).metadata
// org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[
// {"idx":0,"name":"feat1"},{"idx":1,"name":"feat2"},
// {"idx":2,"name":"feat3"}]},"num_attrs":3}
Vector fields are not directly accessible using dot syntax (like $features.feat1
) but can used by specialized tools like VectorSlicer
:
import org.apache.spark.ml.feature.VectorSlicer
val slicer = new VectorSlicer()
.setInputCol("features")
.setOutputCol("featuresSubset")
.setNames(Array("feat1", "feat3"))
slicer.transform(dfWithMeta).show
// +---+-------------+--------------+
// | id| features|featuresSubset|
// +---+-------------+--------------+
// | 1|[1.0,2.0,3.0]| [1.0,3.0]|
// +---+-------------+--------------+
For PySpark see How can I declare a Column as a categorical feature in a DataFrame for use in ml
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With