Using table streaming, I am trying to write stream using foreachBatch
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
...
WriteStreamToDelta looks like
def WriteStreamToDelta(microDF, batch_id):
microDFWrangled = microDF."some_transformations"
print(microDFWrangled.count()) <-- How do I achieve the equivalence of this?
microDFWrangled.writeStream...
I would like to view the number of rows in
Running microDFWrangled.count() in each microbatch is going to be a bit expensive thing. I believe much more efficient is to involve StreamingQueryListener which can send output to the console, to the driver logs, to external database etc.
StreamingQueryListener is efficient because it uses internal streaming statistics, so no need to run extra computation just to get the record count.
However, in case of PySpark, this feature works in Databricks starting 11.0. And in OSS spark I think it is available only since the latest releases
Reference: https://www.databricks.com/blog/2022/05/27/how-to-monitor-streaming-queries-in-pyspark.html
If you still want to send the output using print(), consider to add .awaitTermination() as the last chained statement:
df.writestream
.format("delta")
.foreachBatch(WriteStreamToDelta)
.Start()
.awaitTermination()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With