Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Structured Streaming - Foreach Sink

I am basically reading from a Kafka source, and dumping each message through to my foreach processor (Thanks Jacek's page for the simple example).

If this actually works, i shall actually perform some business logic in the process method here, however, this doesn't work. I believe that the println doesn't work since its running on executors and there is no way for getting those logs back to driver. However, this insert into a temp table should at least work and show me that the messages are actually consumed and processed through to the sink.

What am I missing here ?

Really looking for a second set of eyes to check my effort here:

 val stream = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker")) 
      .option("subscribe", src_topic) 
      .load()

    val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]

    val df = stream.selectExpr("cast (value as string) as json")

    val writer = new ForeachWriter[Row] {
      val scon = new SConConnection
      override def open(partitionId: Long, version: Long) = {
        true
      }
      override def process(value: Row) = {
        println("++++++++++++++++++++++++++++++++++++" + value.get(0))
        scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
      }
      override def close(errorOrNull: Throwable) = {
        scon.closeConnection
      }
    }


    val yy = df.writeStream
      .queryName("ForEachQuery")
      .foreach(writer)
      .outputMode("append")
      .start()

    yy.awaitTermination()
like image 448
Raghav Avatar asked May 26 '17 03:05

Raghav


People also ask

What is sink in Spark structured streaming?

Spark Structured Streaming defines the Sink trait representing the interface for external storage systems which can collect the results of a streaming query. There is only one method defined by Sink trait, 'addBatch' which takes the 'batchId' and the DataFrame representing the batch data as arguments.

What is a sink in streaming?

sink - the property which takes an input. stream - the property which gives the output out of the Stream.

What is difference between DStream and structured streaming?

Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.

Is structured streaming exactly once?

Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.


1 Answers

Thanks for comments from Harald and others, I found out a couple of things, which led me to achieve normal processing behaviour -

  1. test code with local mode, yarn isnt the biggest help in debugging
  2. for some reason, the process method of foreach sink doesnt allow calling other methods. When i put my business logic directly in there, it works.

hope it helps others.

like image 173
Raghav Avatar answered Oct 13 '22 12:10

Raghav