I am trying to speed up and limit the cost of taking several columns and their values and inserting them into a map in the same row. This is a requirement because we have a legacy system that is reading from this job and it isn't yet ready to be refactored. There is also another map with some data that needs to be combined with this.
Currently we have a few solutions all of which seem to result in about the same run time on the same cluster with around 1TB of data stored in Parquet:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
def jsonToMap(s: String, map: Map[String, String]): Map[String, String] = {
implicit val formats = org.json4s.DefaultFormats
val jsonMap = if(!s.isEmpty){
parse(s).extract[Map[String, String]]
} else {
Map[String, String]()
}
if(map != null){
map ++ jsonMap
} else {
jsonMap
}
}
val udfJsonToMap = udf(jsonToMap _)
def addMap(key:String, value:String, map: Map[String,String]): Map[String,String] = {
if(map == null) {
Map(key -> value)
} else {
map + (key -> value)
}
}
val addMapUdf = udf(addMap _)
val output = raw.columns.foldLeft(raw.withColumn("allMap", typedLit(Map.empty[String, String]))) { (memoDF, colName) =>
if(colName.startsWith("columnPrefix/")){
memoDF.withColumn("allMap", when(col(colName).isNotNull, addMapUdf(substring_index(lit(colName), "/", -1), col(colName), col("allTagsMap")) ))
} else if(colName.equals("originalMap")){
memoDF.withColumn("allMap", when(col(colName).isNotNull, udfJsonToMap(col(colName), col("allMap"))))
} else {
memoDF
}
}
takes about 1h on 9 m5.xlarge
val resourceTagColumnNames = raw.columns.filter(colName => colName.startsWith("columnPrefix/"))
def structToMap: Row => Map[String,String] = { row =>
row.getValuesMap[String](resourceTagColumnNames)
}
val structToMapUdf = udf(structToMap)
val experiment = raw
.withColumn("allStruct", struct(resourceTagColumnNames.head, resourceTagColumnNames.tail:_*))
.select("allStruct")
.withColumn("allMap", structToMapUdf(col("allStruct")))
.select("allMap")
Also runs in about 1h on the same cluster
This code all works but it isn't fast enough it is about 10 times longer than every other transform we have right now and it is a bottle neck for us.
Is there another way to get this result that is more efficient?
Edit: I have also tried limiting the data by a key however because the values in the columns I am merging can change despite the key remaining the same I cannot limit the data size without risking data loss.
Tl;DR: using only spark sql built-in functions can significantly speedup computation
As explained in this answer, spark sql native functions are more performant than user-defined functions. So we can try to implement the solution to your problem using only spark sql native functions.
I show two main versions of implementation. One using all the sql functions existing in last version of Spark available at the time I wrote this answer, which is Spark 3.0. And another using only sql functions existing in spark version when the question was asked, so functions existing in Spark 2.3. All the used functions in this version are also available in Spark 2.2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}
val mapFromPrefixedColumns = map_filter(
map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*),
(_, v) => v.isNotNull
)
val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
map()
)
val comprehensiveMapExpr = map_concat(mapFromPrefixedColumns, mapFromOriginalMap)
raw.withColumn("allMap", comprehensiveMapExpr)
In spark 2.2, We don't have the functions map_concat
(available in spark 2.4) and map_filter
(available in spark 3.0).
I replace them with user-defined functions:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{MapType, StringType}
def filterNull(map: Map[String, String]): Map[String, String] = map.toSeq.filter(_._2 != null).toMap
val filter_null_udf = udf(filterNull _)
def mapConcat(map1: Map[String, String], map2: Map[String, String]): Map[String, String] = map1 ++ map2
val map_concat_udf = udf(mapConcat _)
val mapFromPrefixedColumns = filter_null_udf(
map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c.dropWhile(_ != '/').tail), col(c))): _*)
)
val mapFromOriginalMap = when(col("originalMap").isNotNull && col("originalMap").notEqual(""),
from_json(col("originalMap"), MapType(StringType, StringType))
).otherwise(
map()
)
val comprehensiveMapExpr = map_concat_udf(mapFromPrefixedColumns, mapFromOriginalMap)
raw.withColumn("allMap", comprehensiveMapExpr)
The last part of the question contains a simplified code without mapping of the json column and without filtering of null values in result map. I created the following implementation for this specific case. As I don't use functions that were added between spark 2.2 and spark 3.0, I don't need two versions of this implementation:
import org.apache.spark.sql.functions._
val mapFromPrefixedColumns = map(raw.columns.filter(_.startsWith("columnPrefix/")).flatMap(c => Seq(lit(c), col(c))): _*)
raw.withColumn("allMap", mapFromPrefixedColumns)
For the following dataframe as input:
+--------------------+--------------------+--------------------+----------------+
|columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|originalMap |
+--------------------+--------------------+--------------------+----------------+
|a |1 |x |{"column4": "k"}|
|b |null |null |null |
|c |null |null |{} |
|null |null |null |null |
|d |2 |null | |
+--------------------+--------------------+--------------------+----------------+
I obtain the following allMap
column:
+--------------------------------------------------------+
|allMap |
+--------------------------------------------------------+
|[column1 -> a, column2 -> 1, column3 -> x, column4 -> k]|
|[column1 -> b] |
|[column1 -> c] |
|[] |
|[column1 -> d, column2 -> 2] |
+--------------------------------------------------------+
And for the mapping without json column:
+---------------------------------------------------------------------------------+
|allMap |
+---------------------------------------------------------------------------------+
|[columnPrefix/column1 -> a, columnPrefix/column2 -> 1, columnPrefix/column3 -> x]|
|[columnPrefix/column1 -> b, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 -> c, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 ->, columnPrefix/column2 ->, columnPrefix/column3 ->] |
|[columnPrefix/column1 -> d, columnPrefix/column2 -> 2, columnPrefix/column3 ->] |
+---------------------------------------------------------------------------------+
I generated a csv file of 10 millions lines, uncompressed (about 800 Mo), containing one column without column prefix, nine columns with column prefix, and one colonne containing json as a string:
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|id |columnPrefix/column1|columnPrefix/column2|columnPrefix/column3|columnPrefix/column4|columnPrefix/column5|columnPrefix/column6|columnPrefix/column7|columnPrefix/column8|columnPrefix/column9|originalMap |
+---+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+
|1 |iwajedhor |ijoefzi |der |ob |galsu |ril |le |zaahuz |fuzi |{"column10":"true"}|
|2 |ofo |davfiwir |lebfim |roapej |lus |roum |te |javes |karutare |{"column10":"true"}|
|3 |jais |epciel |uv |piubnak |saajo |doke |ber |pi |igzici |{"column10":"true"}|
|4 |agami |zuhepuk |er |pizfe |lafudbo |zan |hoho |terbauv |ma |{"column10":"true"}|
...
The benchmark is to read this csv file, create the column allMap
, and write this column to parquet. I ran this on my local machine and I obtained the following results
+--------------------------+--------------------+-------------------------+-------------------------+
| implementations | current (with udf) | sql functions spark 3.0 | sql functions spark 2.2 |
+--------------------------+--------------------+-------------------------+-------------------------+
| execution time | 138 seconds | 48 seconds | 82 seconds |
| improvement from current | 0 % faster | 64 % faster | 40 % faster |
+--------------------------+--------------------+-------------------------+-------------------------+
I also ran against the second implementation in the question, that drop the mapping of the json column and the filtering of null value in map.
+--------------------------+-----------------------+------------------------------------+
| implementations | current (with struct) | sql functions without json mapping |
+--------------------------+-----------------------+------------------------------------+
| execution time | 46 seconds | 35 seconds |
| improvement from current | 0 % | 23 % faster |
+--------------------------+-----------------------+------------------------------------+
Of course, the benchmark is rather basic, but we can see an improvement compared to the implementations that use user-defined functions
When you have a performance issue and you use user-defined functions, it can be a good idea to try to replace those user-defined functions by spark sql functions
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With