Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mixing Spark Structured Streaming API and DStream to write to Kafka

I've recently noticed I have a confusion regarding Spark Streaming (I'm currently learning Spark).

I am reading data from a Kafka topic like this:

val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )

Which creates a DStream.

In order to work with event-time (and not processing-time) I did this:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF().withWatermark("timestamp", "60 seconds")
            .groupBy(
              window($"timestamp", "60 seconds", "10 seconds")
            )
            .sum("meterIncrement")
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("topic", "taxi-dollar-accurate")
            .start()
        )
      })

And I get the error

'writeStream' can be called only on streaming Dataset/DataFrame

Which surprised me, because the source of the DF is a DStream. Anyway, I managed to solve this by changing .writeStream to .write and .start() to .save().

But I got the feeling that I lost the streaming power on that foreach somehow. Clearly that's why I am writing this question. Is this a correct approach? I've seen other scripts that use

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

But I don't know how different is this from just calling foreach on the DStream and then transforming each RDD to DF.

like image 484
Codigo Morsa Avatar asked Sep 17 '25 05:09

Codigo Morsa


1 Answers

But I don't know how different is this from just calling foreach on the DStream and then transforming each RDD to DF.

When you are calling:

outputStream
      .foreachRDD(rdd => {
          rdd.toDF()
            .[...]
            .toJSON
            .toDF("value")
            .writeStream
            .format("kafka")

your variable rdd (or the Dataframe) became a single RDD which is not a stream anymore. Hence, the rdd.toDF.[...].writeStream will not work anymore.

Continue with RDD

If you choose to use the DSream approach, you can send those single RDDs calling the KafkaProducer API.

An example:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = new KafkaProducer[String, String](kafkaParameters)
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

However, this is not the recommended approach as you are creating and closing a KafkaProducer in each batch interval on each executor. But this should give you a basic understanding on how to write data to Kafka using the DirectStream API.

To further optimize sending your data to Kafka you can follow the guidance given here.

Continue with Dataframe

However, you could also transform your RDD into a Dataframe, but then making sure to call the batch-oriented API to write data into Kafka:

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .save()

For all the details on how to write a batch Dataframe into Kafka is geven in the Spark Structured Streaming + Kafka Integration Guide

Note

Still, and most importantly, I highly recommend to not mix up RDD and Structured API for such a case and rather stick to the one or the other.

like image 61
Michael Heil Avatar answered Sep 21 '25 10:09

Michael Heil