Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write streaming dataset to Kafka?

I'm trying to do some enrichment to the topics data. Therefore read from Kafka sink back to Kafka using Spark structured streaming.

val ds = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("subscribe", "topicname")
      .load()


val enriched = ds.select("key", "value", "topic").as[(String, String, String)].map(record => enrich(record._1,
      record._2, record._3)

val query = enriched.writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("group.id", groupId)
      .option("topic", "desttopic")
      .start()

But im getting an exception:

Exception in thread "main" java.lang.UnsupportedOperationException: Data source kafka does not support streamed writing
    at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:287)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:266)
    at kafka_bridge.KafkaBridge$.main(KafkaBridge.scala:319)
    at kafka_bridge.KafkaBridge.main(KafkaBridge.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)

Any workarounds?

like image 511
ppanero Avatar asked Mar 24 '17 09:03

ppanero


2 Answers

As T. Gawęda mentioned, there is no kafka format to write streaming datasets to Kafka (i.e. a Kafka sink).

The currently recommended solution in Spark 2.1 is to use foreach operator.

The foreach operation allows arbitrary operations to be computed on the output data. As of Spark 2.1, this is available only for Scala and Java. To use this, you will have to implement the interface ForeachWriter (Scala/Java docs), which has methods that get called whenever there is a sequence of rows generated as output after a trigger. Note the following important points.

like image 119
Jacek Laskowski Avatar answered Sep 20 '22 06:09

Jacek Laskowski


Spark 2.1 (which is currently the latest release of Spark) doesn't have it. The next release - 2.2 - will have Kafka Writer, see this commit.

Kafka Sink is the same as Kafka Writer.

like image 29
T. Gawęda Avatar answered Sep 20 '22 06:09

T. Gawęda