I am basically reading from a Kafka source, and dumping each message through to my foreach
processor (Thanks Jacek's page for the simple example).
If this actually works, i shall actually perform some business logic in the process
method here, however, this doesn't work. I believe that the println
doesn't work since its running on executors and there is no way for getting those logs back to driver. However, this insert into
a temp table should at least work and show me that the messages are actually consumed and processed through to the sink.
What am I missing here ?
Really looking for a second set of eyes to check my effort here:
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()
val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]
val df = stream.selectExpr("cast (value as string) as json")
val writer = new ForeachWriter[Row] {
val scon = new SConConnection
override def open(partitionId: Long, version: Long) = {
true
}
override def process(value: Row) = {
println("++++++++++++++++++++++++++++++++++++" + value.get(0))
scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
}
override def close(errorOrNull: Throwable) = {
scon.closeConnection
}
}
val yy = df.writeStream
.queryName("ForEachQuery")
.foreach(writer)
.outputMode("append")
.start()
yy.awaitTermination()
Spark Structured Streaming defines the Sink trait representing the interface for external storage systems which can collect the results of a streaming query. There is only one method defined by Sink trait, 'addBatch' which takes the 'batchId' and the DataFrame representing the batch data as arguments.
sink - the property which takes an input. stream - the property which gives the output out of the Stream.
Internally, a DStream is a sequence of RDDs. Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing.
Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Thanks for comments from Harald and others, I found out a couple of things, which led me to achieve normal processing behaviour -
hope it helps others.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With