Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Converting multiple different columns to Map column with Spark Dataframe scala

I have a data frame with column: user, address1, address2, address3, phone1, phone2 and so on. I want to convert this data frame to - user, address, phone where address = Map("address1" -> address1.value, "address2" -> address2.value, "address3" -> address3.value)

I was able to convert the columns to map using:

val mapData = List("address1", "address2", "address3")
df.map(_.getValuesMap[Any](mapData))

but I am not sure how to add this to my df.

I am new to spark and scala and could really use some help here.

like image 332
Jds Avatar asked Oct 18 '15 14:10

Jds


People also ask

How do I combine columns in spark data frame?

Using concat() Function to Concatenate DataFrame Columns Spark SQL functions provide concat() to concatenate two or more DataFrame columns into a single Column. It can also take columns of different Data Types and concatenate them into a single column. for example, it supports String, Int, Boolean and also arrays.

How do I create a column map in spark?

2.1 Using Spark DataTypes. We can create a map column using createMapType() function on the DataTypes class. This method takes two arguments keyType and valueType as mentioned above and these two arguments should be of a type that extends DataType.

How do I select multiple columns in spark data frame?

You can select the single or multiple columns of the Spark DataFrame by passing the column names you wanted to select to the select() function. Since DataFrame is immutable, this creates a new DataFrame with a selected columns. show() function is used to show the DataFrame contents.


1 Answers

Spark >= 2.0

You can skip udf and use map (create_map in Python) SQL function:

import org.apache.spark.sql.functions.map

df.select(
  map(mapData.map(c => lit(c) :: col(c) :: Nil).flatten: _*).alias("a_map")
)

Spark < 2.0

As far as I know there is no direct way to do it. You can use an UDF like this:

import org.apache.spark.sql.functions.{udf, array, lit, col}

val df = sc.parallelize(Seq(
  (1L, "addr1", "addr2", "addr3")
)).toDF("user", "address1", "address2", "address3")

val asMap = udf((keys: Seq[String], values: Seq[String]) => 
  keys.zip(values).filter{
    case (k, null) => false
    case _ => true
  }.toMap)

val keys = array(mapData.map(lit): _*)
val values = array(mapData.map(col): _*)

val dfWithMap = df.withColumn("address", asMap(keys, values))

Another option, which doesn't require UDFs, is to struct field instead of map:

val dfWithStruct = df.withColumn("address", struct(mapData.map(col): _*))

The biggest advantage is that it can easily handle values of different types.

like image 68
zero323 Avatar answered Oct 22 '22 03:10

zero323