Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark-Streaming from an Actor

I would like to have a consumer actor subscribe to a Kafka topic and stream data for further processing with Spark Streaming outside the consumer. Why an actor? Because I read that its supervisor strategy would be a great way to handle Kafka failures (e.g., restart on a failure).

I found two options:

  • The Java KafkaConsumer class: its poll() method returns a Map[String, Object]. I would like a DStream to be returned just like KafkaUtils.createDirectStream would, and I don't know how to fetch the stream from outside the actor.
  • Extend the ActorHelper trait and use actorStream() like shown in this example. This latter option doesn't display a connection to a topic but to a socket.

Could anyone point me in the right direction?

like image 956
wipman Avatar asked Feb 21 '17 08:02

wipman


1 Answers

For handling Kafka failures, I used the Apache Curator framework and the following workaround:

val client: CuratorFramework = ... // see docs
val zk: CuratorZookeeperClient = client.getZookeeperClient

/**
  * This method returns false if kafka or zookeeper is down.
  */ 
def isKafkaAvailable:Boolean = 
   Try {
      if (zk.isConnected) {
        val xs = client.getChildren.forPath("/brokers/ids")
        xs.size() > 0
      }
      else false
    }.getOrElse(false)

For consuming Kafka topics, I used the com.softwaremill.reactivekafka library. For example:

class KafkaConsumerActor extends Actor {
   val kafka = new ReactiveKafka()
   val config: ConsumerProperties[Array[Byte], Any] = ... // see docs

   override def preStart(): Unit = {
      super.preStart()

      val publisher = kafka.consume(config)
      Source.fromPublisher(publisher)
            .map(handleKafkaRecord)
            .to(Sink.ignore).run()
   }

   /**
     * This method will be invoked when any kafka records will happen.
     */
   def handleKafkaRecord(r: ConsumerRecord[Array[Byte], Any]) = {
      // handle record
   }
}
like image 75
John Mullins Avatar answered Nov 01 '22 03:11

John Mullins