Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured Streaming output is not showing on Jupyter Notebook

I have two notebooks. First notebook is reading tweets from twitter using tweepy and writing it to a socket. Other notebook is reading tweets from that socket using spark structured streaming (Python) and writing it's result to console. Unfortunately I'm not getting output on jupyter console. Code is working fine on pycharm.

spark = SparkSession \
    .builder \
    .appName("StructuredStreaming") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
    .readStream \
    .format("socket") \
    .option("host", "127.0.0.1") \
    .option("port", 7000) \
    .load()

query = tweets \
    .writeStream \
    .option("truncate", "false") \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()
like image 819
Abdul Haseeb Avatar asked Oct 27 '22 23:10

Abdul Haseeb


1 Answers

I am not sure that this is possible with Jupyter Notebook. However, you can use memory output to achieve similar results. This is simple in the complete mode, but might require some changes for append.

For the complete mode

In a complete output mode, your query should look more or less as follows:

query = tweets \
    .writeStream \
    .outputMode("complete") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()

Notice that there's no query.awaitTermination() at the end. Now, query the your_query_name temp table in another cell and watch the continuously updated results for as long as you want:

from IPython.display import display, clear_output

while True:
    clear_output(wait=True)
    display(query.status)
    display(spark.sql('SELECT * FROM your_query_name').show())
    sleep(1)

For the append mode

If you'd like to use append output mode, you have to use watermarks. You also won't be able to use aggregations, so your code might require some further changes.

query = tweets \
    .withWatermark("timestampColumn", "3 minutes")
    .writeStream \
    .outputMode("append") \
    .format("memory") \
    .queryName("your_query_name") \
    .start()

The code for display stays the same. You can also show query.lastProgress for more detailed information in a similar fashion.

Inspirations and references

  • How to get the output from console streaming sink in Zeppelin?
  • Overwrite previous output in jupyter notebook
like image 93
Dominik Filipiak Avatar answered Nov 09 '22 18:11

Dominik Filipiak