I've recently noticed I have a confusion regarding Spark Streaming (I'm currently learning Spark).
I am reading data from a Kafka topic like this:
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
Which creates a DStream.
In order to work with event-time (and not processing-time) I did this:
outputStream
.foreachRDD(rdd => {
rdd.toDF().withWatermark("timestamp", "60 seconds")
.groupBy(
window($"timestamp", "60 seconds", "10 seconds")
)
.sum("meterIncrement")
.toJSON
.toDF("value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "taxi-dollar-accurate")
.start()
)
})
And I get the error
'writeStream' can be called only on streaming Dataset/DataFrame
Which surprised me, because the source of the DF is a DStream. Anyway, I managed to solve this by changing .writeStream to .write and .start() to .save().
But I got the feeling that I lost the streaming power on that foreach somehow. Clearly that's why I am writing this question. Is this a correct approach? I've seen other scripts that use
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
But I don't know how different is this from just calling foreach on the DStream and then transforming each RDD to DF.
But I don't know how different is this from just calling foreach on the DStream and then transforming each RDD to DF.
When you are calling:
outputStream
.foreachRDD(rdd => {
rdd.toDF()
.[...]
.toJSON
.toDF("value")
.writeStream
.format("kafka")
your variable rdd
(or the Dataframe) became a single RDD which is not a stream anymore. Hence, the rdd.toDF.[...].writeStream
will not work anymore.
If you choose to use the DSream approach, you can send those single RDDs calling the KafkaProducer API.
An example:
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val producer = new KafkaProducer[String, String](kafkaParameters)
partitionOfRecords.foreach { message =>
producer.send(message)
}
producer.close()
}
}
However, this is not the recommended approach as you are creating and closing a KafkaProducer in each batch interval on each executor. But this should give you a basic understanding on how to write data to Kafka using the DirectStream API.
To further optimize sending your data to Kafka you can follow the guidance given here.
However, you could also transform your RDD into a Dataframe, but then making sure to call the batch-oriented API to write data into Kafka:
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
For all the details on how to write a batch Dataframe into Kafka is geven in the Spark Structured Streaming + Kafka Integration Guide
Still, and most importantly, I highly recommend to not mix up RDD and Structured API for such a case and rather stick to the one or the other.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With