Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Unable to groupBy MapType column within Spark DataFrame

My current issue is the following one...

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression 'mapField' cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.;;

What I'm trying to achieve is just basically group entries within a DataFrame by a given set of columns, but seems to be failing when grouping with MapType columns such as the previously mentioned.

  .groupBy(
    ...
    "mapField",
    ...
  )

I've got a couple of ideas but there must be a way easier solution to this problem rather than the following ones that I've thought about...

  • I've got the key, value of each of the elements saved in a concatenated string within the DF, so I could maybe parse those into a Map and then save it using withColumn, but haven't found any approach and I couldn't get mine working either. Is this reasonable to do?

  • Reparse into a RDD and group it there, then back to DF (too much hassle I think)

EDIT

Example input

   id    |  myMap
'sample' |  Map('a' -> 1, 'b' -> 2, 'c' -> 3)

Desired output

   id    |  a  |  b  |  c
'sample' |  1  |  2  |  3
like image 667
Lenny D. Avatar asked Nov 07 '22 17:11

Lenny D.


1 Answers

As the error suggests map<string,string> is not an orderable data type, you will need to represent the map with an orderable type. One of such type is array, therefore we can use map_values and map_keys to extract the map data into 2 different fields, as shown below:

import org.apache.spark.sql.functions.{map_values, map_keys}
val df = Seq(
    (Map("k1"->"v1"), 12),
    (Map("k2"->"v2"), 11),
    (null, 10) 
).toDF("map", "id")

df.select(map_values($"map").as("map_values")).show

// +----------+
// |map_values|
// +----------+
// |      [v1]|
// |      [v2]|
// |      null|
// +---------------+

df.select(map_keys($"map").as("map_keys")).show

// +--------+
// |map_keys|
// +--------+
// |    [k1]|
// |    [k2]|
// |    null|
// +--------+

Then you can use it directly with groupBy:

df.groupBy("map_keys").count()

And a generic modular solution in order to use it multiple times:

def unwrapMap(mapField: String): DataFrame => DataFrame = { df =>
    df.withColumn(s"${mapField}_keys", map_keys(df(mapField)))
      .withColumn(s"${mapField}_values", map_values(df(mapField)))
      .drop(df(mapField))
  }

Usage: df.transform(unwrapMap("map_field"))

like image 182
abiratsis Avatar answered Nov 14 '22 21:11

abiratsis