Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to write a Dataset to Kafka topic?

I am using Spark 2.1.0 and Kafka 0.9.0.

I am trying to push the output of a batch spark job to kafka. The job is supposed to run every hour but not as streaming.

While looking for an answer on the net I could only find kafka integration with Spark streaming and nothing about the integration with the batch job.

Does anyone know if such thing is feasible ?

Thanks

UPDATE :

As mentioned by user8371915, I tried to follow what was done in Writing the output of Batch Queries to Kafka.

I used a spark shell :

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0

Here is the simple code that I tried :

val df = Seq(("Rey", "23"), ("John", "44")).toDF("key", "value")
val newdf = df.select(to_json(struct(df.columns.map(column):_*)).alias("value"))
newdf.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("topic", "alerts").save()

But I get the error :

java.lang.RuntimeException: org.apache.spark.sql.kafka010.KafkaSourceProvider does not allow create table as select.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:497)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
... 50 elided

Have any idea what is this related to ?

Thanks

like image 411
Azzy Avatar asked Apr 06 '18 13:04

Azzy


People also ask

How do you write a DataFrame to a Kafka topic?

Writing data from any Spark supported data source into Kafka is as simple as calling writeStream on any DataFrame that contains a column named “value”, and optionally a column named “key”. If a key column is not specified, then a null valued key column will be automatically added.

Which API is used to create a Dstream on Kafka topic?

StreamingContext API Using the methods provided by this API, you can create DStreams from various input sources.

How do you create a Dataset in spark?

There two ways to create Datasets: dynamically and by reading from a JSON file using SparkSession . First, for primitive types in examples or demos, you can create Datasets within a Scala or Python notebook or in your sample Spark application.


1 Answers

tl;dr You use outdated Spark version. Writes are enabled in 2.2 and later.

Out-of-the-box you can use Kafka SQL connector (the same as used with Structured Streaming). Include

  • spark-sql-kafka in your dependencies.
  • Convert data to DataFrame containing at least value column of type StringType or BinaryType.
  • Write data to Kafka:

    df   
      .write
      .format("kafka")
      .option("kafka.bootstrap.servers", server)
      .save()
    

Follow Structured Streaming docs for details (starting with Writing the output of Batch Queries to Kafka).

like image 183
Alper t. Turker Avatar answered Sep 19 '22 22:09

Alper t. Turker