Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Scala - How to split the probability column (column of vectors) that we obtain when we fit the GMM model to the data in to two separate columns? [duplicate]

I am trying to do the following:

+-----+-------------------------+----------+-------------------------------------------+
|label|features                 |prediction|probability                                |
+-----+-------------------------+----------+-------------------------------------------+
|0.0  |(3,[],[])                |0         |[0.9999999999999979,2.093996169658831E-15] |
|1.0  |(3,[0,1,2],[0.1,0.1,0.1])|0         |[0.999999999999999,9.891337521299582E-16]  |
|2.0  |(3,[0,1,2],[0.2,0.2,0.2])|0         |[0.9999999999999979,2.0939961696578572E-15]|
|3.0  |(3,[0,1,2],[9.0,9.0,9.0])|1         |[2.093996169659668E-15,0.9999999999999979] |
|4.0  |(3,[0,1,2],[9.1,9.1,9.1])|1         |[9.89133752128275E-16,0.999999999999999]   |
|5.0  |(3,[0,1,2],[9.2,9.2,9.2])|1         |[2.0939961696605603E-15,0.9999999999999979]|
+-----+-------------------------+----------+-------------------------------------------+

Convert the above dataframe to have two more columns: prob1 & prob2 Each column having the corresponding values as present in the probability column.

I found similar questions - one in PySpark and the other in Scala. I do not know how to translate the PySpark code and I am getting an error with the Scala code.

PySpark Code:

split1_udf = udf(lambda value: value[0].item(), FloatType())
split2_udf = udf(lambda value: value[1].item(), FloatType())

output2 = randomforestoutput.select(split1_udf('probability').alias('c1'), split2_udf('probability').alias('c2'))

Or to append these columns to the original dataframe:

randomforestoutput.withColumn('c1', split1_udf('probability')).withColumn('c2', split2_udf('probability'))

Scala Code:

import org.apache.spark.sql.functions.udf

val getPOne = udf((v: org.apache.spark.mllib.linalg.Vector) => v(1))
model.transform(testDf).select(getPOne($"probability"))

I get the following error when I run the Scala code:

scala> predictions.select(getPOne(col("probability"))).show(false)
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(probability)' due to data type mismatch: argument 1 requires vector type, however, '`probability`' is of vector type.;;
'Project [UDF(probability#39) AS UDF(probability)#135]
+- Project [label#0, features#1, prediction#34, UDF(features#1) AS probability#39]
   +- Project [label#0, features#1, UDF(features#1) AS prediction#34]
      +- Relation[label#0,features#1] libsvm

I am currently using Scala 2.11.11 and Spark 2.1.1

like image 236
modin3956 Avatar asked Jun 13 '17 21:06

modin3956


People also ask

How do I split a string in Scala based on field separator?

Scala String FAQ: How do I split a String in Scala based on a field separator, such as a string I get from a comma-separated value (CSV) or pipe-delimited file. Use one of the split methods that are available on Scala/Java String objects: The split method returns an array of String elements, which you can then treat as a normal Scala Array:

How to split a Dataframe into multiple columns in spark?

Before we start with an example of Spark split function, first let’s create a DataFrame and will use one of the column from this DataFrame to split into multiple columns From the above DataFrame, column name of type String is a combined field of the first name, middle & lastname separated by comma delimiter.

How to avoid Dataframe splitting three times?

Update: I highly recommend to use Psidom's implementation to avoid splitting three times. Show activity on this post. This appends columns to the original DataFrame and doesn't use select, and only splits once using a temporary column:

How do I convert a string to an array in Scala?

Use one of the split methods that are available on String objects: scala> "hello world".split(" ") res0: Array[java.lang.String] = Array(hello, world) The split method returns an array of String elements, which you can then treat as a normal Scala Array:


1 Answers

What I understand from your question is that you are trying to split probability column into two columns prob1 and prob2. If thats the case then a simple array functionality with withColumn should solve your issue.

predictions
  .withColumn("prob1", $"probability"(0))
  .withColumn("prob2", $"probability"(1))
  .drop("probability")

You can find more functions that can help you in the future to be applied to dataframes.

Edited

I created a temp dataframe to match with your column as

val predictions = Seq(Array(1.0,2.0), Array(2.0939961696605603E-15,0.9999999999999979), Array(Double.NaN,Double.NaN)).toDF("probability")
+--------------------------------------------+
|probability                                 |
+--------------------------------------------+
|[1.0, 2.0]                                  |
|[2.0939961696605603E-15, 0.9999999999999979]|
|[NaN, NaN]                                  |
+--------------------------------------------+

And I applied the above withColumns which resulted

+----------------------+------------------+
|prob1                 |prob2             |
+----------------------+------------------+
|1.0                   |2.0               |
|2.0939961696605603E-15|0.9999999999999979|
|NaN                   |NaN               |
+----------------------+------------------+

Schema mismatch Edit

Now that since Vector schema of your probability column doesn't match with above solution of arrayType schema, above solution shall not work in your condition. Please use the following solution.

You will have to create udf functions and return the value as expected as

   val first = udf((v: Vector) => v.toArray(0))
    val second = udf((v: Vector) => v.toArray(1))
    predictions
      .withColumn("prob1", first($"probability"))
      .withColumn("prob2", second($"probability"))
      .drop("probability")

I hope you get the desired result.

like image 99
Ramesh Maharjan Avatar answered Sep 23 '22 16:09

Ramesh Maharjan