Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Merging rows into a single struct column in spark scala has efficiency problems, how do we do it better?

I am trying to speed up and limit the cost of taking several columns and their values and inserting them into a map in the same row. This is a requirement because we have a legacy system that is reading from this job and it isn't yet ready to be refactored. There is also another map with some data that needs to be combined with this.

Currently we have a few solutions all of which seem to result in about the same run time on the same cluster with around 1TB of data stored in Parquet:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._

def jsonToMap(s: String, map: Map[String, String]): Map[String, String] = { 
  implicit val formats = org.json4s.DefaultFormats
    val jsonMap = if(!s.isEmpty){
      parse(s).extract[Map[String, String]]
    } else {
      Map[String, String]()
    }
    if(map != null){
      map ++ jsonMap
    } else {
      jsonMap
    }
  }
val udfJsonToMap = udf(jsonToMap _)

def addMap(key:String, value:String, map: Map[String,String]): Map[String,String] = {
  if(map == null) {
    Map(key -> value)
  } else {
    map + (key -> value)
  }
}

val addMapUdf = udf(addMap _)

val output = raw.columns.foldLeft(raw.withColumn("allMap", typedLit(Map.empty[String, String]))) { (memoDF, colName) =>
    if(colName.startsWith("columnPrefix/")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, addMapUdf(substring_index(lit(colName), "/", -1), col(colName), col("allTagsMap")) ))
    } else if(colName.equals("originalMap")){
        memoDF.withColumn("allMap", when(col(colName).isNotNull, udfJsonToMap(col(colName), col("allMap"))))
    } else {
      memoDF
    }
}

takes about 1h on 9 m5.xlarge

val resourceTagColumnNames = raw.columns.filter(colName => colName.startsWith("columnPrefix/"))
def structToMap: Row => Map[String,String] = { row =>
  row.getValuesMap[String](resourceTagColumnNames)
}
val structToMapUdf = udf(structToMap)

val experiment = raw
  .withColumn("allStruct", struct(resourceTagColumnNames.head, resourceTagColumnNames.tail:_*))
  .select("allStruct")
  .withColumn("allMap", structToMapUdf(col("allStruct")))
  .select("allMap")

Also runs in about 1h on the same cluster

This code all works but it isn't fast enough it is about 10 times longer than every other transform we have right now and it is a bottle neck for us.

Is there another way to get this result that is more efficient?

Edit: I have also tried limiting the data by a key however because the values in the columns I am merging can change despite the key remaining the same I cannot limit the data size without risking data loss.

like image 486
alexddupree Avatar asked Nov 08 '22 00:11

alexddupree


1 Answers

Tl;DR: using only spark sql built-in functions can significantly speedup computation

As explained in this answer, spark sql native functions are more performant than user-defined functions. So we can try to implement the solution to your problem using only spark sql native functions.

I show two main versions of implementation. One using all the sql functions existing in last version of Spark available at the time I wrote this answer, which is Spark 3.0. And another using only sql functions existing in spark version when the question was asked, so functions existing in Spark 2.3. All the used functions in this version are also available in Spark 2.2

Spark 3.0 implementation with sql functions

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

val mapFromPrefixedColumns = map_filter(
  map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*),
  (_, v) => v.isNotNull
)

val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
  from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
  map()
)

val comprehensiveMapExpr = map_concat(mapFromPrefixedColumns, mapFromOriginalMap)

raw.withColumn("allMap", comprehensiveMapExpr)

Spark 2.2 implementation with sql functions

In spark 2.2, We don't have the functions map_concat (available in spark 2.4) and map_filter (available in spark 3.0). I replace them with user-defined functions:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}

def filterNull(map: Map[String, String]): Map[String, String] = map.toSeq.filter(_._2 != null).toMap
val filter_null_udf = udf(filterNull _)

def mapConcat(map1: Map[String, String], map2: Map[String, String]): Map[String, String] = map1 ++ map2
val map_concat_udf = udf(mapConcat _)

val mapFromPrefixedColumns = filter_null_udf(
  map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*)
)

val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
  from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
  map()
)

val comprehensiveMapExpr = map_concat_udf(mapFromPrefixedColumns, mapFromOriginalMap)

raw.withColumn("allMap", comprehensiveMapExpr)

Implementation with sql functions without json mapping

The last part of the question contains a simplified code without mapping of the json column and without filtering of null values in result map. I created the following implementation for this specific case. As I don't use functions that were added between spark 2.2 and spark 3.0, I don't need two versions of this implementation:

import org.apache.spark.sql.functions._

val mapFromPrefixedColumns = map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c), col(c))): _*)
raw.withColumn("allMap", mapFromPrefixedColumns)

Run

For the following dataframe as input:

+--------------------+--------------------+--------------------+----------------+
|columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|originalMap     |
+--------------------+--------------------+--------------------+----------------+
|a                   |1                   |x                   |{"column4": "k"}|
|b                   |null                |null                |null            |
|c                   |null                |null                |{}              |
|null                |null                |null                |null            |
|d                   |2                   |null                |                |
+--------------------+--------------------+--------------------+----------------+

I obtain the following allMap column:

+--------------------------------------------------------+
|allMap                                                  |
+--------------------------------------------------------+
|[column1 -> a, column2 -> 1, column3 -> x, column4 -> k]|
|[column1 -> b]                                          |
|[column1 -> c]                                          |
|[]                                                      |
|[column1 -> d, column2 -> 2]                            |
+--------------------------------------------------------+

And for the mapping without json column:

+---------------------------------------------------------------------------------+
|allMap                                                                           |
+---------------------------------------------------------------------------------+
|[columnPrefix/column1 -> a, columnPrefix/column2 -> 1, columnPrefix/column3 -> x]|
|[columnPrefix/column1 -> b, columnPrefix/column2 ->, columnPrefix/column3 ->]    |
|[columnPrefix/column1 -> c, columnPrefix/column2 ->, columnPrefix/column3 ->]    |
|[columnPrefix/column1 ->, columnPrefix/column2 ->, columnPrefix/column3 ->]      |
|[columnPrefix/column1 -> d, columnPrefix/column2 -> 2, columnPrefix/column3 ->]  |
+---------------------------------------------------------------------------------+

Benchmark

I generated a csv file of 10 millions lines, uncompressed (about 800 Mo), containing one column without column prefix, nine columns with column prefix, and one colonne containing json as a string:

+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|id |columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|columnPrefix/column4|columnPrefix/column5|columnPrefix/column6|columnPrefix/column7|columnPrefix/column8|columnPrefix/column9|originalMap        |
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|1  |iwajedhor           |ijoefzi             |der                 |ob                  |galsu               |ril                 |le                  |zaahuz              |fuzi                |{"column10":"true"}|
|2  |ofo                 |davfiwir            |lebfim              |roapej              |lus                 |roum                |te                  |javes               |karutare            |{"column10":"true"}|
|3  |jais                |epciel              |uv                  |piubnak             |saajo               |doke                |ber                 |pi                  |igzici              |{"column10":"true"}|
|4  |agami               |zuhepuk             |er                  |pizfe               |lafudbo             |zan                 |hoho                |terbauv             |ma                  |{"column10":"true"}|
...

The benchmark is to read this csv file, create the column allMap, and write this column to parquet. I ran this on my local machine and I obtained the following results

+--------------------------+--------------------+-------------------------+-------------------------+
|     implementations      | current (with udf) | sql functions spark 3.0 | sql functions spark 2.2 |
+--------------------------+--------------------+-------------------------+-------------------------+
| execution time           | 138 seconds        | 48 seconds              | 82 seconds              |
| improvement from current | 0 % faster         | 64 % faster             | 40 % faster             |
+--------------------------+--------------------+-------------------------+-------------------------+

I also ran against the second implementation in the question, that drop the mapping of the json column and the filtering of null value in map.

+--------------------------+-----------------------+------------------------------------+
| implementations          | current (with struct) | sql functions without json mapping |
+--------------------------+-----------------------+------------------------------------+
| execution time           | 46 seconds            | 35 seconds                         |
| improvement from current | 0 %                   | 23 % faster                        |
+--------------------------+-----------------------+------------------------------------+

Of course, the benchmark is rather basic, but we can see an improvement compared to the implementations that use user-defined functions

Conclusion

When you have a performance issue and you use user-defined functions, it can be a good idea to try to replace those user-defined functions by spark sql functions

like image 198
Vincent Doba Avatar answered Nov 14 '22 23:11

Vincent Doba