Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Task not serializable (Case Classes)

Spark throws Task not serializable when I use case class or class/object that extends Serializable inside a closure.

object WriteToHbase extends Serializable {
    def main(args: Array[String]) {
        val csvRows: RDD[Array[String] = ...
        val dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
        val usersRDD = csvRows.map(row => {
            new UserTable(row(0), row(1), row(2), row(9), row(10), row(11))
        })
        processUsers(sc: SparkContext, usersRDD, dateFormatter)
    })
}

def processUsers(sc: SparkContext, usersRDD: RDD[UserTable], dateFormatter: DateTimeFormatter): Unit = {

    usersRDD.foreachPartition(part => {

        val conf = HBaseConfiguration.create()
        val table = new HTable(conf, tablename)

        part.foreach(userRow => {
            val id = userRow.id
            val date1 = dateFormatter.parseDateTime(userRow.date1)
        })
        table.flushCommits()
        table.close()
    })
}

My first attempt was to use a case class:

case class UserTable(id: String, name: String, address: String, ...) extends Serializable

My second attempt was to use a class instead of a case class:

class UserTable (val id: String, val name: String, val addtess: String, ...) extends Serializable {
}

My third attempt was to use a companion object in the class:

object UserTable extends Serializable {
    def apply(id: String, name: String, address: String, ...) = new UserTable(id, name, address, ...)
}
like image 494
sophie Avatar asked May 27 '15 20:05

sophie


People also ask

What are the causes of NotSerializableException in spark?

The error was triggered when I initialize a variable on the driver (master), but then tried to use it on one of the workers. When that happens, Spark Streaming will try to serialize the object to send it over to the worker, and fail if the object is not serializable. I solved the error by making the variable static.

Is Scala case class serializable?

All case classes automatically extend Product and Serializable . It looks ugly ? yes.

What is spark serialization?

Advertisements. Serialization is used for performance tuning on Apache Spark. All data that is sent over the network or written to the disk or persisted in the memory should be serialized. Serialization plays an important role in costly operations.


1 Answers

Most likely the function "doSomething" is defined on your class which isn't serilizable. Instead move the "doSomething" function to a companion object (e.g. make it static).

like image 170
Holden Avatar answered Sep 28 '22 18:09

Holden