I want to create a Jupyter/IPython extension to monitor Apache Spark Jobs.
Spark provides a REST API.
However instead of polling the server, I want the event updates to be sent through callbacks.
I am trying to register a SparkListener
with the SparkContext.addSparkListener()
. This feature is not available in the PySpark SparkContext
object in Python. So how can I register a python listener to Scala/Java version of the context from Python. Is it possible to do this through py4j
? I want python functions to be called when the events fire in the listener.
In simple terms, PySpark is a way to connect to Spark cluster using Python. Why do you need Spark Data Frame if you have Pandas Data Frame? Good Question — If you have to think if you need Spark, probably you don’t need Spark at all.
You should either use the spark-submit command to run the PySpark (Spark with python) application or use the PySpark shell to run interactive commands for testing. Note: Do not use Python shell or Python command to run PySpark program.
You can install just a PySpark package and connect to an existing cluster or Install complete Apache Spark (includes PySpark package) to setup your own cluster. In this article, I will cover step-by-step installing pyspark by using pip, Anaconda (conda command), manually on Windows and Mac. Manually download and instal by yourself.
First Install findspark using pip command. Post successful installation, import it in Python program or shell to validate PySpark imports. Run below commands in sequence.
It is possible although it is a bit involved. We can use Py4j callback mechanism to pass message from a SparkListener
. First lets create a Scala package with all required classes. Directory structure:
.
├── build.sbt
└── src
└── main
└── scala
└── net
└── zero323
└── spark
└── examples
└── listener
├── Listener.scala
├── Manager.scala
└── TaskListener.scala
build.sbt
:
name := "listener"
organization := "net.zero323"
scalaVersion := "2.11.7"
val sparkVersion = "2.1.0"
libraryDependencies ++= List(
"org.apache.spark" %% "spark-core" % sparkVersion,
"net.sf.py4j" % "py4j" % "0.10.4" // Just for the record
)
Listener.scala
defines a Python interface we are going to implement later
package net.zero323.spark.examples.listener
/* You can add arbitrary methods here,
* as long as these match corresponding Python interface
*/
trait Listener {
/* This will be implemented by a Python class.
* You can of course use more specific types,
* for example here String => Unit */
def notify(x: Any): Any
}
Manager.scala
will be used to forward messages to Python listener:
package net.zero323.spark.examples.listener
object Manager {
var listeners: Map[String, Listener] = Map()
def register(listener: Listener): String = {
this.synchronized {
val uuid = java.util.UUID.randomUUID().toString
listeners = listeners + (uuid -> listener)
uuid
}
}
def unregister(uuid: String) = {
this.synchronized {
listeners = listeners - uuid
}
}
def notifyAll(message: String): Unit = {
for { (_, listener) <- listeners } listener.notify(message)
}
}
Finally a simple SparkListener
:
package net.zero323.spark.examples.listener
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
/* A simple listener which captures SparkListenerTaskEnd,
* extracts numbers of records written by the task
* and converts to JSON. You can of course add handlers
* for other events as well.
*/
class PythonNotifyListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val recordsWritten = taskEnd.taskMetrics.outputMetrics.recordsWritten
val message = compact(render(
("recordsWritten" -> recordsWritten)
))
Manager.notifyAll(message)
}
}
Lets' package our extension:
sbt package
and start PySpark session adding a generated jar
to the class path and registering listener:
$SPARK_HOME/bin/pyspark \
--driver-class-path target/scala-2.11/listener_2.11-0.1-SNAPSHOT.jar \
--conf spark.extraListeners=net.zero323.spark.examples.listener.PythonNotifyListener
Next we have to define a Python object which implements Listener
interface:
class PythonListener(object):
package = "net.zero323.spark.examples.listener"
@staticmethod
def get_manager():
jvm = SparkContext.getOrCreate()._jvm
manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager"))
return manager
def __init__(self):
self.uuid = None
def notify(self, obj):
"""This method is required by Scala Listener interface
we defined above.
"""
print(obj)
def register(self):
manager = PythonListener.get_manager()
self.uuid = manager.register(self)
return self.uuid
def unregister(self):
manager = PythonListener.get_manager()
manager.unregister(self.uuid)
self.uuid = None
class Java:
implements = ["net.zero323.spark.examples.listener.Listener"]
start callback server:
sc._gateway.start_callback_server()
create and register listener:
listener = PythonListener()
register it:
listener.register()
and test:
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test")
{"recordsWritten":33}
{"recordsWritten":34}
{"recordsWritten":33}
On exit you should shutdown the callback server:
sc._gateway.shutdown_callback_server()
Note:
This should be used with caution when working with Spark streaming, which internally uses callback server.
Edit:
If this is to much hassle you could just define org.apache.spark.scheduler.SparkListenerInterface
:
class SparkListener(object):
def onApplicationEnd(self, applicationEnd):
pass
def onApplicationStart(self, applicationStart):
pass
def onBlockManagerRemoved(self, blockManagerRemoved):
pass
def onBlockUpdated(self, blockUpdated):
pass
def onEnvironmentUpdate(self, environmentUpdate):
pass
def onExecutorAdded(self, executorAdded):
pass
def onExecutorMetricsUpdate(self, executorMetricsUpdate):
pass
def onExecutorRemoved(self, executorRemoved):
pass
def onJobEnd(self, jobEnd):
pass
def onJobStart(self, jobStart):
pass
def onOtherEvent(self, event):
pass
def onStageCompleted(self, stageCompleted):
pass
def onStageSubmitted(self, stageSubmitted):
pass
def onTaskEnd(self, taskEnd):
pass
def onTaskGettingResult(self, taskGettingResult):
pass
def onTaskStart(self, taskStart):
pass
def onUnpersistRDD(self, unpersistRDD):
pass
class Java:
implements = ["org.apache.spark.scheduler.SparkListenerInterface"]
extend it:
class TaskEndListener(SparkListener):
def onTaskEnd(self, taskEnd):
print(taskEnd.toString())
and use it directly:
>>> sc._gateway.start_callback_server()
True
>>> listener = TaskEndListener()
>>> sc._jsc.sc().addSparkListener(listener)
>>> sc.parallelize(range(100), 3).saveAsTextFile("/tmp/listener_test_simple")
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@9e7514a,org.apache.spark.executor.TaskMetrics@51b8ba92)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@71278a44,org.apache.spark.executor.TaskMetrics@bdc06d)
SparkListenerTaskEnd(0,0,ResultTask,Success,org.apache.spark.scheduler.TaskInfo@336)
While simpler, this method is not selective (more traffic between JVM and Python) requires handling Java objects inside Python session.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With