Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to get the output from console streaming sink in Zeppelin?

I'm struggling to get the console sink working with PySpark Structured Streaming when run from Zeppelin. Basically, I'm not seeing any results printed to the screen, or to any logfiles I've found.

My question: Does anyone have a working example of using PySpark Structured Streaming with a sink that produces output visible in Apache Zeppelin? Ideally it would also use the socket source, as that's easy to test with.

I'm using:

  • Ubuntu 16.04
  • spark-2.2.0-bin-hadoop2.7
  • zeppelin-0.7.3-bin-all
  • Python3

I've based my code on the structured_network_wordcount.py example. It works when run from the PySpark shell (./bin/pyspark --master local[2]); I see tables per batch.

%pyspark
# structured streaming
from pyspark.sql.functions import *
lines = spark\
    .readStream\
    .format('socket')\
    .option('host', 'localhost')\
    .option('port', 9999)\
    .option('includeTimestamp', 'true')\
    .load()

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.groupBy(
    window(words.timestamp, '10 seconds', '1 seconds'),
    words.word
).count().orderBy('window')

# Start running the query that prints the windowed word counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('complete')\
    .format('console')\
    .option('truncate', 'false')\
    .start()

print("Starting...")
query.awaitTermination(20)

I'd expect to see printouts of results for each batch, but instead I just see Starting..., and then False, the return value of query.awaitTermination(20).

In a separate terminal I'm entering some data into a nc -lk 9999 netcat session while the above is running.

like image 276
m01 Avatar asked Nov 17 '17 18:11

m01


People also ask

What is the console output sink?

We covered the console output sink multiple times in a previous post. The file sink stores the contents of a streaming DataFrame to a specified directory and format. We use initDf (created above) and apply a simple transformation before storing it to the file system.

What is console sink in Spark Streaming?

In Spark Streaming, output sinks store results into external storage. Console sink: Displays the content of the DataFrame to console. In this series, we have only used console sink, refer to previous posts for details.

How to get the standard output stream from a console?

Given a normal console, the task is to get the Standard Output Stream through this Console in C#. Approach: This can be done using the Out property in the Console class of the System package in C#. Note: The TextWriter represents the standard output stream.

What is complete output mode in streaming?

Streaming – Complete Output Mode OutputMode in which all the rows in the streaming DataFrame/Dataset will be written to the sink every time there are some updates. Use complete as output mode outputMode ("complete") when you want to aggregate the data and output the entire results to sink every time.


1 Answers

Console sink is not a good choice for interactive notebook-based workflow. Even in Scala, where the output can be captured, it requires awaitTermination call (or equivalent) in the same paragraph, effectively blocking the note.

%spark

spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", "9999")
  .option("includeTimestamp", "true")
  .load()
  .writeStream
  .outputMode("append")
  .format("console")
  .option("truncate", "false")
  .start()
  .awaitTermination() // Block execution, to force Zeppelin to capture the output

Chained awaitTermination could be replaced with standalone call in the same paragraph would work as well:

%spark

val query = df
  .writeStream
  ...
  .start()

query.awaitTermination()

Without it, Zeppelin has no reason to wait for any output. PySpark just adds another problem on top of that - indirect execution. Because of that, even blocking the query won't help you here.

Moreover continuous output from the stream can cause rendering issues and memory problems when browsing the note (it might be possible to use Zeppelin display system via InterpreterContext or REST API, to achieve a bit more sensible behavior, where the output is overwritten or periodically cleared).

A much better choice for testing with Zeppelin is memory sink. This way you can start a query without blocking:

%pyspark

query = (windowedCounts
  .writeStream
  .outputMode("complete")
  .format("memory")
  .queryName("some_name")
  .start())

and query the result on demand in another paragraph:

%pyspark

spark.table("some_name").show()

It can be coupled with reactive streams or similar solution to provide interval based updates.

It is also possible to use StreamingQueryListener with Py4j callbacks to couple rx with onQueryProgress events, although query listeners are not supported in PySpark, and require a bit of code, to glue things together. Scala interface:

package com.example.spark.observer

import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._

trait PythonObserver {
  def on_next(o: Object): Unit
}

class PythonStreamingQueryListener(observer: PythonObserver) 
    extends StreamingQueryListener {
  override def onQueryProgress(event: QueryProgressEvent): Unit = {
    observer.on_next(event)
  }
  override def onQueryStarted(event: QueryStartedEvent): Unit = {}
  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}

build a jar, adjusting build definition to reflect desired Scala and Spark version:

scalaVersion := "2.11.8"  

val sparkVersion = "2.2.0"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % sparkVersion,
  "org.apache.spark" %% "spark-streaming" % sparkVersion
)

put it on the Spark classpath, patch StreamingQueryManager:

%pyspark

from pyspark.sql.streaming import StreamingQueryManager
from pyspark import SparkContext

def addListener(self, listener):
    jvm = SparkContext._active_spark_context._jvm
    jlistener = jvm.com.example.spark.observer.PythonStreamingQueryListener(
        listener
    )
    self._jsqm.addListener(jlistener)
    return jlistener


StreamingQueryManager.addListener = addListener

start callback server:

%pyspark

sc._gateway.start_callback_server()

and add listener:

%pyspark

from rx.subjects import Subject

class StreamingObserver(Subject):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

observer = StreamingObserver()
spark.streams.addListener(observer)

Finally you can use subscribe and block execution:

%pyspark

(observer
    .map(lambda p: p.progress().name())
    # .filter() can be used to print only for a specific query
    .subscribe(lambda n: spark.table(n).show() if n else None))
input()  # Block execution to capture the output 

The last step should be executed after you started streaming query.

It is also possible to skip rx and use minimal observer like this:

class StreamingObserver(object):
    class Java:
        implements = ["com.example.spark.observer.PythonObserver"]

    def on_next(self, value):
        try:
            name = value.progress().name()
            if name:
                spark.table(name).show()
        except: pass

It gives a bit less control than the Subject (one caveat is that this can interfere with other code printing to stdout and can be stopped only by removing listener. With Subject you can easily dispose subscribed observer, once you're done), but otherwise should work more or less the same.

Note that any blocking action will be sufficient to capture the output from the listener and it doesn't have to be executed in the same cell. For example

%pyspark

observer = StreamingObserver()
spark.streams.addListener(observer)

and

%pyspark

import time

time.sleep(42)

would work in a similar way, printing table for a defined time interval.

For completeness you can implement StreamingQueryManager.removeListener.

like image 161
zero323 Avatar answered Sep 29 '22 12:09

zero323