I would like to have a consumer actor subscribe to a Kafka topic and stream data for further processing with Spark Streaming outside the consumer. Why an actor? Because I read that its supervisor strategy would be a great way to handle Kafka failures (e.g., restart on a failure).
I found two options:
KafkaConsumer
class: its poll()
method returns a Map[String, Object]
. I would like a DStream
to be returned just like KafkaUtils.createDirectStream
would, and I don't know how to fetch the stream from outside the actor.ActorHelper
trait and use actorStream()
like shown in this example. This latter option doesn't display a connection to a topic but to a socket.Could anyone point me in the right direction?
For handling Kafka failures, I used the Apache Curator framework and the following workaround:
val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient
/**
* This method returns false if kafka or zookeeper is down.
*/
def isKafkaAvailable:Boolean =
Try {
if (zk.isConnected) {
val xs = client.getChildren.forPath("/brokers/ids")
xs.size() > 0
}
else false
}.getOrElse(false)
For consuming Kafka topics, I used the com.softwaremill.reactivekafka
library. For example:
class KafkaConsumerActor extends Actor {
val kafka = new ReactiveKafka()
val config: ConsumerProperties[Array[Byte], Any] = ... // see docs
override def preStart(): Unit = {
super.preStart()
val publisher = kafka.consume(config)
Source.fromPublisher(publisher)
.map(handleKafkaRecord)
.to(Sink.ignore).run()
}
/**
* This method will be invoked when any kafka records will happen.
*/
def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
// handle record
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With