Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Attach metadata to vector column in Spark

Context: I have a data frame with two columns: label, and features.

org.apache.spark.sql.DataFrame = [label: int, features: vector]

Where features is a mllib.linalg.VectorUDT of numeric type built using VectorAssembler.

Question: Is there a way to assign a schema to the features vector? I want to keep track of the name of each feature.

Tried so far:

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("feat1", "feat2", "feat3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

scala> attrGroup.toMetadata 
res197: org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[{"idx":0,"name":"f1"},{"idx":1,"name":"f2"},{"idx":2,"name":"f3"}]},"num_attrs":3}}

But was not sure how to apply this to an existing data frame.

like image 543
gstvolvr Avatar asked Feb 10 '16 01:02

gstvolvr


People also ask

What is AGG function in spark?

Aggregations are generally used to get the summary of the data. You can count, add and also find the product of the data. Using Spark, you can aggregate any kind of value into a set, list, etc. We will see this in “Aggregating to Complex Types”.

What is withColumn in PySpark?

In PySpark, the withColumn() function is widely used and defined as the transformation function of the DataFrame which is further used to change the value, convert the datatype of an existing column, create the new column etc.

How do I load data into spark DataFrame?

Using csv("path") or format("csv"). load("path") of DataFrameReader, you can read a CSV file into a PySpark DataFrame, These methods take a file path to read from as an argument.


1 Answers

There at lest two options:

  1. On existing DataFrame you can use as method with metadata argument:

    import org.apache.spark.ml.attribute._
    
    val rdd = sc.parallelize(Seq(
      (1, Vectors.dense(1.0, 2.0, 3.0))
    ))
    val df = rdd.toDF("label", "features")
    
    df.withColumn("features", $"features".as("_", attrGroup.toMetadata))
    
  2. When you create new DataFrame convert AttributeGroup toStructField and use it as a schema for a given column:

    import org.apache.spark.sql.types.{StructType, StructField, IntegerType}
    
    val schema = StructType(Array(
      StructField("label", IntegerType, false),
      attrGroup.toStructField()
    ))
    
    spark.createDataFrame(
      rdd.map(row => Row.fromSeq(row.productIterator.toSeq)),
      schema)
    

If vector column has been created using VectorAssembler column metadata describing parent columns should be already attached.

import org.apache.spark.ml.feature.VectorAssembler

val raw = sc.parallelize(Seq(
  (1, 1.0, 2.0, 3.0)
)).toDF("id", "feat1", "feat2", "feat3")

val assembler = new VectorAssembler()
  .setInputCols(Array("feat1", "feat2", "feat3"))
  .setOutputCol("features")

val dfWithMeta = assembler.transform(raw).select($"id", $"features")
dfWithMeta.schema.fields(1).metadata

// org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{"numeric":[
//   {"idx":0,"name":"feat1"},{"idx":1,"name":"feat2"},
//   {"idx":2,"name":"feat3"}]},"num_attrs":3}

Vector fields are not directly accessible using dot syntax (like $features.feat1) but can used by specialized tools like VectorSlicer:

import org.apache.spark.ml.feature.VectorSlicer

val slicer = new VectorSlicer()
  .setInputCol("features")
  .setOutputCol("featuresSubset")
  .setNames(Array("feat1", "feat3"))

slicer.transform(dfWithMeta).show
// +---+-------------+--------------+
// | id|     features|featuresSubset|
// +---+-------------+--------------+
// |  1|[1.0,2.0,3.0]|     [1.0,3.0]|
// +---+-------------+--------------+

For PySpark see How can I declare a Column as a categorical feature in a DataFrame for use in ml

like image 56
zero323 Avatar answered Sep 26 '22 05:09

zero323