Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clear kafka topics for unit testing

I need to perform unit testing on a kafka application avoiding third-party libraries.

My problem right now is that I would like to clear all the topics between tests but I don't know how.

This is my temporary solution: commit every message produced after each test and put all test consumers in the same consumer group.

override protected def afterEach():Unit={
    val cleanerConsumer= newConsumer(Seq.empty)
    val topics=cleanerConsumer.listTopics()
    println("pulisco")
    cleanerConsumer.subscribe(topics.keySet())
    cleanerConsumer.poll(100)
    cleanerConsumer.commitSync()
    cleanerConsumer.close()
}

This doesn't work though and I don't know why.

For example, when I create a new consumer inside a test, messages contains the messages produced in the previous test.

val consumerProbe = newConsumer(SMSGatewayTopic)

val messages = consumerProbe.poll(1000)

How can I solve this?

like image 293
Chobeat Avatar asked Aug 09 '16 12:08

Chobeat


People also ask

How do you write a unit test for Kafka consumer?

Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.

How many Kafka topics should I have?

The rule of thumb is that the number of Kafka topics can be in the thousands. Jun Rao (Kafka committer; now at Confluent but he was formerly in LinkedIn's Kafka team) wrote: At LinkedIn, our largest cluster has more than 2K topics. 5K topics should be fine.

How do I test Kafka messages?

To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.


2 Answers

You can also embed a Kafka/Zookeeper instances in your test sources, to have more controller over such isolated services.

trait Kafka { self: ZooKeeper =>
  Kafka.start()
}

object Kafka {
  import org.apache.hadoop.fs.FileUtil
  import kafka.server.KafkaServer

  @volatile private var started = false

  lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile

  lazy val kafkaServer: KafkaServer = {
    val config = com.typesafe.config.ConfigFactory.
      load(this.getClass.getClassLoader)

    val (host, port) = {
      val (h, p) = config.getString("kafka.servers").span(_ != ':')
      h -> p.drop(1).toInt
    }

    val serverConf = new kafka.server.KafkaConfig({
      val props = new java.util.Properties()
      props.put("port", port.toString)
      props.put("broker.id", port.toString)
      props.put("log.dir", logDir.getAbsolutePath)

      props.put(
        "zookeeper.connect",
        s"localhost:${config getInt "test.zookeeper.port"}"
      )

      props
    })

    new KafkaServer(serverConf)
  }

  def start(): Unit = if (!started) {
    try {
      kafkaServer.startup()
      started = true
    } catch {
      case err: Throwable =>
        println(s"fails to start Kafka: ${err.getMessage}")
        throw err
    }
  }

  def stop(): Unit = try {
    if (started) kafkaServer.shutdown()
  } finally {
    FileUtil.fullyDelete(logDir)
  }
}

trait ZooKeeper {
  ZooKeeper.start()
}

object ZooKeeper {
  import java.nio.file.Files
  import java.net.InetSocketAddress
  import org.apache.hadoop.fs.FileUtil
  import org.apache.zookeeper.server.ZooKeeperServer
  import org.apache.zookeeper.server.ServerCnxnFactory

  @volatile private var started = false
  lazy val logDir = Files.createTempDirectory("zk-log").toFile
  lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile

  lazy val (zkServer, zkFactory) = {
    val srv = new ZooKeeperServer(
      snapshotDir, logDir, 500
    )

    val config = com.typesafe.config.ConfigFactory.
      load(this.getClass.getClassLoader)
    val port = config.getInt("test.zookeeper.port")

    srv -> ServerCnxnFactory.createFactory(
      new InetSocketAddress("localhost", port), 1024
    )
  }

  def start(): Unit = if (!zkServer.isRunning) {
    try {
      zkFactory.startup(zkServer)

      started = true

      while (!zkServer.isRunning) {
        Thread.sleep(500)
      }
    } catch {
      case err: Throwable =>
        println(s"fails to start ZooKeeper: ${err.getMessage}")
        throw err
    }
  }

  def stop(): Unit = try {
    if (started) zkFactory.shutdown()
  } finally {
    try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
    FileUtil.fullyDelete(snapshotDir)
  }
}

The tests classes can extends Kafka with ZooKeeper to ensure this available.

If the test JVM is not forked, Tests.Cleanup in SBT testOptions in Test setting can be used to stop the embedded services after testing.

like image 171
cchantep Avatar answered Nov 05 '22 18:11

cchantep


I would suggest, you simply recreate all topics before your tests. For example, this is the way kafka tests create/delete topics:

Kafka repository on GitHub

like image 44
codejitsu Avatar answered Nov 05 '22 17:11

codejitsu