Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

log from spark udf to driver

I have a simple UDF in databricks used in spark. I can't use println or log4j or something because it will get outputted to the execution, I need it in the driver. I have a very system log setup

var logMessage = ""

def log(msg: String){
  logMessage += msg + "\n"
}

def writeLog(file: String){
  println("start write")
  println(logMessage)
  println("end write")
}

def warning(msg: String){
  log("*WARNING* " + msg)
}

val CleanText = (s: int) => {
  log("I am in this UDF")
  s+2
}

sqlContext.udf.register("CleanText", CleanText)

How can I get this to function properly and log to driver?

like image 881
test acc Avatar asked Aug 28 '18 18:08

test acc


People also ask

Why we should not use UDF in spark?

It is well known that the use of UDFs (User Defined Functions) in Apache Spark, and especially in using the Python API, can compromise our application performace. For this reason, at Damavis we try to avoid their use as much as possible infavour of using native functions or SQL .

How does UDF work in spark?

Description. User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs. It also contains examples that demonstrate how to define and register UDFs and invoke them in Spark SQL.

Are spark UDFs distributed?

Types of Spark UDFs and execution:In distributed mode, Spark uses master/worker architecture for execution. The central coordinator, called driver, communicates with a potentially large number of distributed workers, called executors. The driver and workers run their own Java process (JVM).


1 Answers

The closest mechanism in Apache Spark to what you're trying to do is accumulators. You can accumulate the log lines on the executors and access the result in driver:

// create a collection accumulator using the spark context:
val logLines: CollectionAccumulator[String] = sc.collectionAccumulator("log")

// log function adds a line to accumulator
def log(msg: String): Unit = logLines.add(msg)

// driver-side function can print the log using accumulator's *value*
def writeLog() {
  import scala.collection.JavaConverters._
  println("start write")
  logLines.value.asScala.foreach(println)
  println("end write")
}

val CleanText = udf((s: Int) => {
  log(s"I am in this UDF, got: $s")
  s+2
})

// use UDF in some transformation:
Seq(1, 2).toDF("a").select(CleanText($"a")).show()

writeLog()    
// prints: 
// start write
// I am in this UDF, got: 1
// I am in this UDF, got: 2
// end write

BUT: this isn't really recommended, especially not for logging purposes. If you log on every record, this accumulator would eventually crash your driver on OutOfMemoryError or just slow you down horribly.

Since you're using Databricks, I would check what options they support for log aggregation, or simply use the Spark UI to view the executor logs.

like image 51
Tzach Zohar Avatar answered Oct 03 '22 03:10

Tzach Zohar