Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Scala: How to convert Dataframe[vector] to DataFrame[f1:Double, ..., fn: Double)]

I just used Standard Scaler to normalize my features for a ML application. After selecting the scaled features, I want to convert this back to a dataframe of Doubles, though the length of my vectors are arbitrary. I know how to do it for a specific 3 features by using

myDF.map{case Row(v: Vector) => (v(0), v(1), v(2))}.toDF("f1", "f2", "f3")

but not for an arbitrary amount of features. Is there an easy way to do this?

Example:

val testDF = sc.parallelize(List(Vectors.dense(5D, 6D, 7D), Vectors.dense(8D, 9D, 10D), Vectors.dense(11D, 12D, 13D))).map(Tuple1(_)).toDF("scaledFeatures")
val myColumnNames = List("f1", "f2", "f3")
// val finalDF = DataFrame[f1: Double, f2: Double, f3: Double] 

EDIT

I found out how to unpack to column names when creating the dataframe, but still am having trouble converting a vector to a sequence needed to create the dataframe:

finalDF = testDF.map{case Row(v: Vector) => v.toArray.toSeq /* <= this errors */}.toDF(List("f1", "f2", "f3"): _*)
like image 526
mt88 Avatar asked Jun 29 '16 21:06

mt88


People also ask

How do I change the DataFrame column type in spark?

To change the Spark SQL DataFrame column type from one data type to another data type you should use cast() function of Column class, you can use this on withColumn(), select(), selectExpr(), and SQL expression.

Which method can be used to convert a spark dataset to a DataFrame?

Methods for creating Spark DataFrame 1. Create a list and parse it as a DataFrame using the toDataFrame() method from the SparkSession . 2. Convert an RDD to a DataFrame using the toDF() method.

How do I change the DataFrame value in spark?

You can replace column values of PySpark DataFrame by using SQL string functions regexp_replace(), translate(), and overlay() with Python examples.

How you can convert a spark DataFrame if DF to a Pandas DataFrame?

Convert PySpark Dataframe to Pandas DataFrame PySpark DataFrame provides a method toPandas() to convert it to Python Pandas DataFrame. toPandas() results in the collection of all records in the PySpark DataFrame to the driver program and should be done only on a small subset of the data.


4 Answers

Spark >= 3.0.0

Since Spark 3.0 you can use vector_to_array

import org.apache.spark.ml.functions.vector_to_array

testDF.select(vector_to_array($"scaledFeatures").alias("_tmp")).select(exprs:_*)

Spark < 3.0.0

One possible approach is something similar to this

import org.apache.spark.sql.functions.udf

// In Spark 1.x you'll will have to replace ML Vector with MLLib one
// import org.apache.spark.mllib.linalg.Vector
// In 2.x the below is usually the right choice
import org.apache.spark.ml.linalg.Vector

// Get size of the vector
val n = testDF.first.getAs[Vector](0).size

// Simple helper to convert vector to array<double> 
// asNondeterministic is available in Spark 2.3 or befor
// It can be removed, but at the cost of decreased performance
val vecToSeq = udf((v: Vector) => v.toArray).asNondeterministic

// Prepare a list of columns to create
val exprs = (0 until n).map(i => $"_tmp".getItem(i).alias(s"f$i"))

testDF.select(vecToSeq($"scaledFeatures").alias("_tmp")).select(exprs:_*)

If you know a list of columns upfront you can simplify this a little:

val cols: Seq[String] = ???
val exprs = cols.zipWithIndex.map{ case (c, i) => $"_tmp".getItem(i).alias(c) }

For Python equivalent see How to split Vector into columns - using PySpark.

like image 69
zero323 Avatar answered Oct 16 '22 15:10

zero323


Please try VectorSlicer :

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((1, 0.2, 0.8), (2, 0.1, 0.9), (3, 0.3, 0.7))
).toDF("id", "negative_logit", "positive_logit")


val assembler = new VectorAssembler()
  .setInputCols(Array("negative_logit", "positive_logit"))
  .setOutputCol("prediction")

val output = assembler.transform(dataset)
output.show()
/*
+---+--------------+--------------+----------+
| id|negative_logit|positive_logit|prediction|
+---+--------------+--------------+----------+
|  1|           0.2|           0.8| [0.2,0.8]|
|  2|           0.1|           0.9| [0.1,0.9]|
|  3|           0.3|           0.7| [0.3,0.7]|
+---+--------------+--------------+----------+
*/

val slicer = new VectorSlicer()
.setInputCol("prediction")
.setIndices(Array(1))
.setOutputCol("positive_prediction")

val posi_output = slicer.transform(output)
posi_output.show()

/*
+---+--------------+--------------+----------+-------------------+
| id|negative_logit|positive_logit|prediction|positive_prediction|
+---+--------------+--------------+----------+-------------------+
|  1|           0.2|           0.8| [0.2,0.8]|              [0.8]|
|  2|           0.1|           0.9| [0.1,0.9]|              [0.9]|
|  3|           0.3|           0.7| [0.3,0.7]|              [0.7]|
+---+--------------+--------------+----------+-------------------+
*/
like image 29
Tong Avatar answered Oct 16 '22 13:10

Tong


Alternate solution that evovled couple of days ago: Import the VectorDisassembler into your project (as long as it's not merged into Spark), now:

import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 1.2, 1.3), (1, 2.2, 2.3), (2, 3.2, 3.3))
).toDF("id", "val1", "val2")


val assembler = new VectorAssembler()
  .setInputCols(Array("val1", "val2"))
  .setOutputCol("vectorCol")

val output = assembler.transform(dataset)
output.show()
/*
+---+----+----+---------+
| id|val1|val2|vectorCol|
+---+----+----+---------+
|  0| 1.2| 1.3|[1.2,1.3]|
|  1| 2.2| 2.3|[2.2,2.3]|
|  2| 3.2| 3.3|[3.2,3.3]|
+---+----+----+---------+*/

val disassembler = new org.apache.spark.ml.feature.VectorDisassembler()
  .setInputCol("vectorCol")
disassembler.transform(output).show()
/*
+---+----+----+---------+----+----+
| id|val1|val2|vectorCol|val1|val2|
+---+----+----+---------+----+----+
|  0| 1.2| 1.3|[1.2,1.3]| 1.2| 1.3|
|  1| 2.2| 2.3|[2.2,2.3]| 2.2| 2.3|
|  2| 3.2| 3.3|[3.2,3.3]| 3.2| 3.3|
+---+----+----+---------+----+----+*/
like image 2
Boern Avatar answered Oct 16 '22 13:10

Boern


I use Spark 2.3.2, and built a xgboost4j binary-classification model, the result looks like this:

results_train.select("classIndex","probability","prediction").show(3,0)
+----------+----------------------------------------+----------+
|classIndex|probability                             |prediction|
+----------+----------------------------------------+----------+
|1         |[0.5998525619506836,0.400147408246994]  |0.0       |
|1         |[0.5487841367721558,0.45121586322784424]|0.0       |
|0         |[0.5555324554443359,0.44446757435798645]|0.0       |

I define the following udf to get the elements out of vector column probability

import org.apache.spark.sql.functions._

def getProb = udf((probV: org.apache.spark.ml.linalg.Vector, clsInx: Int) => probV.apply(clsInx) )

results_train.select("classIndex","probability","prediction").
withColumn("p_0",getProb($"probability",lit(0))).
withColumn("p_1",getProb($"probability", lit(1))).show(3,0)

+----------+----------------------------------------+----------+------------------+-------------------+
|classIndex|probability                             |prediction|p_0               |p_1                |
+----------+----------------------------------------+----------+------------------+-------------------+
|1         |[0.5998525619506836,0.400147408246994]  |0.0       |0.5998525619506836|0.400147408246994  |
|1         |[0.5487841367721558,0.45121586322784424]|0.0       |0.5487841367721558|0.45121586322784424|
|0         |[0.5555324554443359,0.44446757435798645]|0.0       |0.5555324554443359|0.44446757435798645|

Hope this would help for those who handle with Vector type input.

like image 1
Yuehan Lyu Avatar answered Oct 16 '22 15:10

Yuehan Lyu