Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use Spark Structured Streaming with Kafka Direct Stream?

I came across Structured Streaming with Spark, it has an example of continuously consuming from an S3 bucket and writing processed results to a MySQL DB.

// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")

// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
       .writeStream.format("jdbc")
       .start("jdbc:mysql//...")

How can this be used with Spark Kafka Streaming?

val stream = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})?

like image 931
SergeyB Avatar asked Sep 01 '16 15:09

SergeyB


1 Answers

Is there a way to combine these two examples without using stream.foreachRDD(rdd => {})?

Not yet. Spark 2.0.0 doesn't have Kafka sink support for Structured Streaming. This is a feature that should come out in Spark 2.1.0 according to Tathagata Das, one of the creators of Spark Streaming. Here is the relevant JIRA issue.

Edit: (29/11/2018)

Yes, It's possible with Spark version 2.2 onwards.

stream
  .writeStream // use `write` for batch, like DataFrame
  .format("kafka")
  .option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
  .option("topic", "target-topic1")
  .start()

Check this SO post(read and write on Kafka topic with Spark streaming) for more.

Edit: (06/12/2016)

Kafka 0.10 integration for Structured Streaming is now expiramentaly supported in Spark 2.0.2:

val ds1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

ds1
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]
like image 190
Yuval Itzchakov Avatar answered Oct 15 '22 06:10

Yuval Itzchakov