Logo Questions Linux Laravel Mysql Ubuntu Git Menu

How to write a Dataset to Kafka topic?

I am using Spark 2.1.0 and Kafka 0.9.0.

I am trying to push the output of a batch spark job to kafka. The job is supposed to run every hour but not as streaming.

While looking for an answer on the net I could only find kafka integration with Spark streaming and nothing about the integration with the batch job.

Does anyone know if such thing is feasible ?



As mentioned by user8371915, I tried to follow what was done in Writing the output of Batch Queries to Kafka.

I used a spark shell :

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

Here is the simple code that I tried :

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

But I get the error :

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

Have any idea what is this related to ?


like image 411
Azzy Avatar asked Apr 06 '18 13:04


People also ask

How do you write a DataFrame to a Kafka topic?

Writing data from any Spark supported data source into Kafka is as simple as calling writeStream on any DataFrame that contains a column named “value”, and optionally a column named “key”. If a key column is not specified, then a null valued key column will be automatically added.

Which API is used to create a Dstream on Kafka topic?

StreamingContext API Using the methods provided by this API, you can create DStreams from various input sources.

How do you create a Dataset in spark?

There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession . First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application.

1 Answers

tl;dr You use outdated Spark version. Writes are enabled in 2.2 and later.

Out-of-the-box you can use Kafka SQL connector (the same as used with Structured Streaming). Include

  • spark-sql-kafka in your dependencies.
  • Convert data to DataFrame containing at least value column of type StringType or BinaryType.
  • Write data to Kafka:

      .option("kafka.bootstrap.servers", server)

Follow Structured Streaming docs for details (starting with Writing the output of Batch Queries to Kafka).

like image 183
Alper t. Turker Avatar answered Sep 19 '22 22:09

Alper t. Turker