Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read json from Kafka and write json to other Kafka topic

I'm trying prepare application for Spark streaming (Spark 2.1, Kafka 0.10)

I need to read data from Kafka topic "input", find correct data and write result to topic "output"

I can read data from Kafka base on KafkaUtils.createDirectStream method.

I converted the RDD to json and prepare filters:

val messages = KafkaUtils.createDirectStream[String, String](
  ssc,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

val elementDstream = messages.map(v => v.value).foreachRDD { rdd =>

  val PeopleDf=spark.read.schema(schema1).json(rdd)
  import spark.implicits._
  PeopleDf.show()
  val PeopleDfFilter = PeopleDf.filter(($"value1".rlike("1"))||($"value2" === 2))
  PeopleDfFilter.show()
}

I can load data from Kafka and write "as is" to Kafka use KafkaProducer:

    messages.foreachRDD( rdd => {
      rdd.foreachPartition( partition => {
        val kafkaTopic = "output"
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")

        val producer = new KafkaProducer[String, String](props)
        partition.foreach{ record: ConsumerRecord[String, String] => {
        System.out.print("########################" + record.value())
        val messageResult = new ProducerRecord[String, String](kafkaTopic, record.value())
        producer.send(messageResult)
        }}
        producer.close()
      })

    })

However, I cannot integrate those two actions > find in json proper value and write findings to Kafka: write PeopleDfFilter in JSON format to "output" Kafka topic.

I have a lot of input messages in Kafka, this is the reason I want to use foreachPartition to create the Kafka producer.

like image 615
Tomtom Avatar asked Nov 23 '17 14:11

Tomtom


People also ask

How do I push JSON data into Kafka?

The following steps can be followed in order to publish JSON messages to Apache Kafka: Go to spring initializr and create a starter project with following dependencies: Open the project in an IDE and sync the dependencies. Now, create a new class Controller with the annotation @RestController.

Can Kafka store JSON?

Non-spatial data can be written to a Kafka Topic. The adapter will handle both single JSON records and JSON records organized in an array. The Kafka Inbound Transport supports TLS 1.2 and SASL security protocols for authenticating with a Kafka cluster or broker.

Which API is used to create a Dstream on Kafka topic?

StreamingContext API Using the methods provided by this API, you can create DStreams from various input sources.


1 Answers

The process is very simple so why not use structured streaming all the way?

import org.apache.spark.sql.functions.from_json

spark
  // Read the data
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", inservers) 
  .option("subscribe", intopic)
  .load()
  // Transform / filter
  .select(from_json($"value".cast("string"), schema).alias("value"))
  .filter(...)  // Add the condition
  .select(to_json($"value").alias("value")
  // Write back
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", outservers)
  .option("subscribe", outtopic)
  .start()
like image 157
user8996943 Avatar answered Oct 18 '22 21:10

user8996943