Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to add a SparkListener from pySpark in Python?

I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.

Spark provides a REST API.

However instead of polling the server, I want the event updates to be sent through callbacks.

I am trying to register a SparkListener with the SparkContext.addSparkListener(). This feature is not available in the PySpark SparkContext object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j? I want python functions to be called when the events fire in the listener.

like image 297
Krishnan R Avatar asked May 20 '17 07:05

Krishnan R


People also ask

What is pyspark in Python?

In simple terms, PySpark is a way to connect to Spark cluster using Python. Why do you need Spark Data Frame if you have Pandas Data Frame? Good Question — If you have to think if you need Spark, probably you don’t need Spark at all.

How do I run a Spark Program in Python?

You should either use the spark-submit command to run the PySpark (Spark with python) application or use the PySpark shell to run interactive commands for testing. Note: Do not use Python shell or Python command to run PySpark program.

How to install pyspark in Apache Spark?

You can install just a PySpark package and connect to an existing cluster or Install complete Apache Spark (includes PySpark package) to setup your own cluster. In this article, I will cover step-by-step installing pyspark by using pip, Anaconda (conda command), manually on Windows and Mac. Manually download and instal by yourself.

How to validate pyspark imports in Python?

First Install findspark using pip command. Post successful installation, import it in Python program or shell to validate PySpark imports. Run below commands in sequence.


1 Answers

It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener. First lets create a Scala package with all required classes. Directory structure:

.
├── build.sbt
└── src
    └── main
        └── scala
            └── net
                └── zero323
                    └── spark
                        └── examples
                            └── listener
                                ├── Listener.scala
                                ├── Manager.scala
                                └── TaskListener.scala

build.sbt:

name := "listener"

organization := "net.zero323"

scalaVersion := "2.11.7"

val sparkVersion = "2.1.0"

libraryDependencies ++= List(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "net.sf.py4j" % "py4j" % "0.10.4"  // Just for the record
)

Listener.scala defines a Python interface we are going to implement later

package net.zero323.spark.examples.listener

/* You can add arbitrary methods here, 
 * as long as these match corresponding Python interface 
 */
trait Listener {
  /* This will be implemented by a Python class.
   * You can of course use more specific types, 
   * for example here String => Unit */
  def notify(x: Any): Any
}

Manager.scala will be used to forward messages to Python listener:

package net.zero323.spark.examples.listener

object Manager {
  var listeners: Map[String, Listener] = Map()

  def register(listener: Listener): String = {
    this.synchronized {
      val uuid = java.util.UUID.randomUUID().toString
      listeners = listeners + (uuid -> listener)
      uuid
    }
  }

  def unregister(uuid: String) = {
    this.synchronized {
      listeners = listeners - uuid
    }
  }

  def notifyAll(message: String): Unit = {
    for { (_, listener) <- listeners } listener.notify(message)
  }

}

Finally a simple SparkListener:

package net.zero323.spark.examples.listener

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

/* A simple listener which captures SparkListenerTaskEnd,
 * extracts numbers of records written by the task
 * and converts to JSON. You can of course add handlers 
 * for other events as well.
 */
class PythonNotifyListener extends SparkListener { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
    val message = compact(render(
      ("recordsWritten" ->  recordsWritten)
    ))
    Manager.notifyAll(message)
  }
}

Lets' package our extension:

sbt package

and start PySpark session adding a generated jar to the class path and registering listener:

 $SPARK_HOME/bin/pyspark \
   --driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
   --conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener

Next we have to define a Python object which implements Listener interface:

class PythonListener(object):
    package = "net.zero323.spark.examples.listener"

    @staticmethod
    def get_manager():
        jvm = SparkContext.getOrCreate()._jvm
        manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
        return manager

    def __init__(self):
        self.uuid = None

    def notify(self, obj):
        """This method is required by Scala Listener interface
        we defined above.
        """
        print(obj)

    def register(self):
        manager = PythonListener.get_manager()
        self.uuid = manager.register(self)
        return self.uuid

    def unregister(self):
        manager =  PythonListener.get_manager()
        manager.unregister(self.uuid)
        self.uuid = None

    class Java:
        implements = ["net.zero323.spark.examples.listener.Listener"]

start callback server:

sc._gateway.start_callback_server()

create and register listener:

listener = PythonListener()

register it:

listener.register()

and test:

>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}

On exit you should shutdown the callback server:

sc._gateway.shutdown_callback_server()

Note:

This should be used with caution when working with Spark streaming, which internally uses callback server.

Edit:

If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface:

class SparkListener(object):
    def onApplicationEnd(self, applicationEnd):
        pass
    def onApplicationStart(self, applicationStart):
        pass
    def onBlockManagerRemoved(self, blockManagerRemoved):
        pass
    def onBlockUpdated(self, blockUpdated):
        pass
    def onEnvironmentUpdate(self, environmentUpdate):
        pass
    def onExecutorAdded(self, executorAdded):
        pass
    def onExecutorMetricsUpdate(self, executorMetricsUpdate):
        pass
    def onExecutorRemoved(self, executorRemoved):
        pass
    def onJobEnd(self, jobEnd):
        pass
    def onJobStart(self, jobStart):
        pass
    def onOtherEvent(self, event):
        pass
    def onStageCompleted(self, stageCompleted):
        pass
    def onStageSubmitted(self, stageSubmitted):
        pass
    def onTaskEnd(self, taskEnd):
        pass
    def onTaskGettingResult(self, taskGettingResult):
        pass
    def onTaskStart(self, taskStart):
        pass
    def onUnpersistRDD(self, unpersistRDD):
        pass
    class Java:
        implements = ["org.apache.spark.scheduler.SparkListenerInterface"]

extend it:

class TaskEndListener(SparkListener):
    def onTaskEnd(self, taskEnd):
        print(taskEnd.toString())

and use it directly:

>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)

While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.

like image 187
zero323 Avatar answered Oct 02 '22 14:10

zero323