Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark - How to make a map serializable

I need to extract and transform from a big dataset some information which will be later consumed by other dataset.

Since the information to be consumed is always the same, and since it can be stored in a pair-value fashion, I was considering to just save this information in a look-at map which will be consumed by a udf, so I avoid several calls to the big dataset.

The problem is I am getting the following error:

org.apache.spark.SparkException: Task not serializable

Is there any way to make the map serializable?

In case it is not possible, is there another way to store information in a look-at object in Spark?

Here is my code:

val cityTimeZone: scala.collection.immutable.Map[String,Double]  = Map("CEB" -> 8.0, "LGW" -> 0.0, "CPT" -> 2.0
, "MUC" -> 1.0, "SGN" -> 7.0, "BNE" -> 10.0, "DME" -> 3.0, "FJR" -> 4.0, "BAH" -> 3.0, "ARN" -> 1.0, "FCO" -> 1.0, "DUS" -> 1.0, "MRU" -> 4.0, "JFK" -> -5.0, "GLA" -> 0.0)

def getLocalHour = udf ((city:String, timeutc:Int) => {
    val timeOffset = cityTimeZone(city)
    val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
    localtime
})

//$"dateutc" is a timestamp column like this: 2017-03-01 03:45:00 and $"city" a 3 letters code in capitals, like those in the map above

val newDF = DF
.select("dateutc","city")
.withColumn("utchour", hour($"dateutc"))
.withColumn("localhour", getLocalHour($"city", $"utchour"))

display(newDF)
like image 767
Ignacio Alorre Avatar asked Dec 07 '25 04:12

Ignacio Alorre


1 Answers

The member variable declaration

val cityTimeZone  

in combination with

cityTimeZone(city)

inside the udf is problematic, because the latter is just a shortcut for

this.cityTimeZone(city)

where this is (presumably) some huge non-serializable object (probably because it contains a reference to a non-serializable spark context).

Make getLocalHour a lazy val, and move the map that is needed by the udf inside the definition of getLocalHour as a local variable, something along these lines:

lazy val getLocalHour = {
  val cityTimeZone: Map[String, Double] = Map("CEB" -> 8.0, "LGW" -> 0.0)
  udf ((city:String, timeutc:Int) => {
    val timeOffset = cityTimeZone(city)
    val localtime = if((timeutc+timeOffset)%24 >= 0)(timeutc+timeOffset)%24 else ((timeutc+timeOffset)%24)*(-1)
    localtime
  })
}

Alternatively, attach cityTimeZone to some serializable object (i.e. some object that does not contain references to any threads, sockets, spark contexts and all the other non-serializable stuff; e.g. package objects with utility methods and constants would be fine).

If the udf definition contains references to any other member variables, treat those accordingly.

like image 146
Andrey Tyukin Avatar answered Dec 08 '25 18:12

Andrey Tyukin