Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to compose column name using another column's value for withColumn in Scala Spark

I'm trying to add a new column to a DataFrame. The value of this column is the value of another column whose name depends on other columns from the same DataFrame.

For instance, given this:

+---+---+----+----+
|  A|  B| A_1| B_2|
+---+---+----+----+
|  A|  1| 0.1| 0.3|
|  B|  2| 0.2| 0.4|
+---+---+----+----+

I'd like to obtain this:

+---+---+----+----+----+
|  A|  B| A_1| B_2|   C|
+---+---+----+----+----+
|  A|  1| 0.1| 0.3| 0.1|
|  B|  2| 0.2| 0.4| 0.4|
+---+---+----+----+----+

That is, I added column C whose value came from either column A_1 or B_2. The name of the source column A_1 comes from concatenating the value of columns A and B.

I know that I can add a new column based on another and a constant like this:

df.withColumn("C", $"B" + 1)

I also know that the name of the column can come from a variable like this:

val name = "A_1"
df.withColumn("C", col(name) + 1)

However, what I'd like to do is something like this:

df.withColumn("C", col(s"${col("A")}_${col("B")}"))

Which doesn't work.

NOTE: I'm coding in Scala 2.11 and Spark 2.2.

like image 601
rgarc101 Avatar asked Jan 09 '18 18:01

rgarc101


People also ask

How do I combine two columns in spark?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I add a column to a DataFrame from another DataFrame in PySpark?

In PySpark, to add a new column to DataFrame use lit() function by importing from pyspark. sql. functions import lit , lit() function takes a constant value you wanted to add and returns a Column type, if you wanted to add a NULL / None use lit(None) .

How do I rename a column in Scala spark?

1. Using Spark withColumnRenamed – To rename DataFrame column name. Spark has a withColumnRenamed() function on DataFrame to change a column name. This is the most straight forward approach; this function takes two parameters; the first is your existing column name and the second is the new column name you wish for.


2 Answers

You can achieve your requirement by writing a udf function. I am suggesting udf, as your requirement is to process dataframe row by row contradicting to inbuilt functions which functions column by column.

But before that you would need array of column names

val columns = df.columns

Then write a udf function as

import org.apache.spark.sql.functions._
def getValue = udf((A: String, B: String, array: mutable.WrappedArray[String]) => array(columns.indexOf(A+"_"+B)))

where

A is the first column value
B is the second column value
array is the Array of all the columns values

Now just call the udf function using withColumn api

df.withColumn("C", getValue($"A", $"B", array(columns.map(col): _*))).show(false)

You should get your desired output dataframe.

like image 173
Ramesh Maharjan Avatar answered Oct 23 '22 17:10

Ramesh Maharjan


You can select from a map. Define map which translates name to column value:

import org.apache.spark.sql.functions.{col, concat_ws, lit, map}

val dataMap = map(
  df.columns.diff(Seq("A", "B")).flatMap(c => lit(c) :: col(c) :: Nil): _*
)

df.select(dataMap).show(false)
+---------------------------+
|map(A_1, A_1, B_2, B_2)    |
+---------------------------+
|Map(A_1 -> 0.1, B_2 -> 0.3)|
|Map(A_1 -> 0.2, B_2 -> 0.4)|
+---------------------------+

and select from it with apply:

df.withColumn("C", dataMap(concat_ws("_", $"A", $"B"))).show
+---+---+---+---+---+
|  A|  B|A_1|B_2|  C|
+---+---+---+---+---+
|  A|  1|0.1|0.3|0.1|
|  B|  2|0.2|0.4|0.4|
+---+---+---+---+---+

You can also try mapping, but I suspect it won't perform well with very wide data:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

val outputEncoder = RowEncoder(df.schema.add(StructField("C", DoubleType)))

df.map(row => {
   val a = row.getAs[String]("A")
   val b = row.getAs[String]("B")
   val key = s"${a}_${b}"
   Row.fromSeq(row.toSeq :+ row.getAs[Double](key))
})(outputEncoder).show
+---+---+---+---+---+
|  A|  B|A_1|B_2|  C|
+---+---+---+---+---+
|  A|  1|0.1|0.3|0.1|
|  B|  2|0.2|0.4|0.4|
+---+---+---+---+---+

and in general I wouldn't recommend this approach.

If data comes from csv you might consider skipping default csv reader and use custom logic to push column selection directly into parsing process. With pseudocode:

spark.read.text(...).map { line => {
  val a = ???  // parse A
  val b = ???  // parse B
  val c = ???  // find c, based on a and b
  (a, b, c)
}}
like image 27
zero323 Avatar answered Oct 23 '22 17:10

zero323