I need to perform unit testing on a kafka application avoiding third-party libraries.
My problem right now is that I would like to clear all the topics between tests but I don't know how.
This is my temporary solution: commit every message produced after each test and put all test consumers in the same consumer group.
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
This doesn't work though and I don't know why.
For example, when I create a new consumer inside a test, messages
contains the messages produced in the previous test.
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
How can I solve this?
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
The rule of thumb is that the number of Kafka topics can be in the thousands. Jun Rao (Kafka committer; now at Confluent but he was formerly in LinkedIn's Kafka team) wrote: At LinkedIn, our largest cluster has more than 2K topics. 5K topics should be fine.
To test Kafka APIs, you use the API Connection test step. To add it to a test case, you will need a ReadyAPI Test Pro license. If you do not have it, try a ReadyAPI trial.
You can also embed a Kafka/Zookeeper instances in your test sources, to have more controller over such isolated services.
trait Kafka { self: ZooKeeper =>
Kafka.start()
}
object Kafka {
import org.apache.hadoop.fs.FileUtil
import kafka.server.KafkaServer
@volatile private var started = false
lazy val logDir = java.nio.file.Files.createTempDirectory("kafka-log").toFile
lazy val kafkaServer: KafkaServer = {
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val (host, port) = {
val (h, p) = config.getString("kafka.servers").span(_ != ':')
h -> p.drop(1).toInt
}
val serverConf = new kafka.server.KafkaConfig({
val props = new java.util.Properties()
props.put("port", port.toString)
props.put("broker.id", port.toString)
props.put("log.dir", logDir.getAbsolutePath)
props.put(
"zookeeper.connect",
s"localhost:${config getInt "test.zookeeper.port"}"
)
props
})
new KafkaServer(serverConf)
}
def start(): Unit = if (!started) {
try {
kafkaServer.startup()
started = true
} catch {
case err: Throwable =>
println(s"fails to start Kafka: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) kafkaServer.shutdown()
} finally {
FileUtil.fullyDelete(logDir)
}
}
trait ZooKeeper {
ZooKeeper.start()
}
object ZooKeeper {
import java.nio.file.Files
import java.net.InetSocketAddress
import org.apache.hadoop.fs.FileUtil
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.zookeeper.server.ServerCnxnFactory
@volatile private var started = false
lazy val logDir = Files.createTempDirectory("zk-log").toFile
lazy val snapshotDir = Files.createTempDirectory("zk-snapshots").toFile
lazy val (zkServer, zkFactory) = {
val srv = new ZooKeeperServer(
snapshotDir, logDir, 500
)
val config = com.typesafe.config.ConfigFactory.
load(this.getClass.getClassLoader)
val port = config.getInt("test.zookeeper.port")
srv -> ServerCnxnFactory.createFactory(
new InetSocketAddress("localhost", port), 1024
)
}
def start(): Unit = if (!zkServer.isRunning) {
try {
zkFactory.startup(zkServer)
started = true
while (!zkServer.isRunning) {
Thread.sleep(500)
}
} catch {
case err: Throwable =>
println(s"fails to start ZooKeeper: ${err.getMessage}")
throw err
}
}
def stop(): Unit = try {
if (started) zkFactory.shutdown()
} finally {
try { FileUtil.fullyDelete(logDir) } catch { case _: Throwable => () }
FileUtil.fullyDelete(snapshotDir)
}
}
The tests classes can extends Kafka with ZooKeeper
to ensure this available.
If the test JVM is not forked, Tests.Cleanup
in SBT testOptions in Test
setting can be used to stop the embedded services after testing.
I would suggest, you simply recreate all topics before your tests. For example, this is the way kafka tests create/delete topics:
Kafka repository on GitHub
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With