I am using Spark Streaming to process data between two Kafka queues but I can not seem to find a good way to write on Kafka from Spark. I have tried this:
input.foreachRDD(rdd => rdd.foreachPartition(partition => partition.foreach { case x: String => { val props = new HashMap[String, Object]() props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers) props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") println(x) val producer = new KafkaProducer[String, String](props) val message = new ProducerRecord[String, String]("output", null, x) producer.send(message) } } ) )
and it works as intended but instancing a new KafkaProducer for every message is clearly unfeasible in a real context and I'm trying to work around it.
I would like to keep a reference to a single instance for every process and access it when I need to send a message. How can I write to Kafka from Spark Streaming?
This quick start follows these steps: Start a Kafka cluster on a single machine. Write example input data to a Kafka topic, using the so-called console producer included in Apache Kafka. Process the input data with a Java application that uses the Kafka Streams library.
Yes, unfortunately Spark (1.x, 2.x) doesn't make it straight-forward how to write to Kafka in an efficient manner.
I'd suggest the following approach:
KafkaProducer
instance per executor process/JVM.Here's the high-level setup for this approach:
KafkaProducer
because, as you mentioned, it is not serializable. Wrapping it allows you to "ship" it to the executors. The key idea here is to use a lazy val
so that you delay instantiating the producer until its first use, which is effectively a workaround so that you don't need to worry about KafkaProducer
not being serializable.The code snippets below work with Spark Streaming as of Spark 2.0.
Step 1: Wrapping KafkaProducer
import java.util.concurrent.Future import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata} class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable { /* This is the key idea that allows us to work around running into NotSerializableExceptions. */ lazy val producer = createProducer() def send(topic: String, key: K, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, key, value)) def send(topic: String, value: V): Future[RecordMetadata] = producer.send(new ProducerRecord[K, V](topic, value)) } object MySparkKafkaProducer { import scala.collection.JavaConversions._ def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = { val createProducerFunc = () => { val producer = new KafkaProducer[K, V](config) sys.addShutdownHook { // Ensure that, on executor JVM shutdown, the Kafka producer sends // any buffered messages to Kafka before shutting down. producer.close() } producer } new MySparkKafkaProducer(createProducerFunc) } def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap) }
Step 2: Use a broadcast variable to give each executor its own wrapped KafkaProducer
instance
import org.apache.kafka.clients.producer.ProducerConfig val ssc: StreamingContext = { val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]") new StreamingContext(sparkConf, Seconds(1)) } ssc.checkpoint("checkpoint-directory") val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = { val kafkaProducerConfig = { val p = new Properties() p.setProperty("bootstrap.servers", "broker1:9092") p.setProperty("key.serializer", classOf[ByteArraySerializer].getName) p.setProperty("value.serializer", classOf[StringSerializer].getName) p } ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig)) }
Step 3: Write from Spark Streaming to Kafka, re-using the same wrapped KafkaProducer
instance (for each executor)
import java.util.concurrent.Future import org.apache.kafka.clients.producer.RecordMetadata val stream: DStream[String] = ??? stream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record => kafkaProducer.value.send("my-output-topic", record) }.toStream metadata.foreach { metadata => metadata.get() } } }
Hope this helps.
My first advice would be to try to create a new instance in foreachPartition and measure if that is fast enough for your needs (instantiating heavy objects in foreachPartition is what the official documentation suggests).
Another option is to use an object pool as illustrated in this example:
https://github.com/miguno/kafka-storm-starter/blob/develop/src/main/scala/com/miguno/kafkastorm/kafka/PooledKafkaProducerAppFactory.scala
I however found it hard to implement when using checkpointing.
Another version that is working well for me is a factory as described in the following blog post, you just have to check if it provides enough parallelism for your needs (check the comments section):
http://allegro.tech/2015/08/spark-kafka-integration.html
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With