I have some JUnit tests on code that uses a kafka topic. The mock kafka topics I've tried do not work and the examples found online are very old so they also do not work with 0.8.2.1. How do I create a mock kafka topic using 0.8.2.1?
To clarify: I'm choosing to use an actual embedded instance of the topic in order to test with a real instance rather than mocking the hand off in mockito. This is so I can test that my custom encoders and decoders actually work and it doesn't fail when I go to use a real kafka instance.
Testing a Kafka Consumer Consuming data from Kafka consists of two main steps. Firstly, we have to subscribe to topics or assign topic partitions manually. Secondly, we poll batches of records using the poll method. The polling is usually done in an infinite loop.
If you want to write integration tests using EmbeddedKafka , then you can do something like this. Assume we have some KafkaListener , which accepts a RequestDto as a Payload . In your test class you should create a TestConfiguration in order to create producer beans and to autowire KafkaTemplate into your test.
You can mock the template but it's better to mock the interface. Sender sender = new Sender(); KafkaOperations template = mock(KafkaOperations. class); SettableListenableFuture<SendResult<String, String>> future = new SettableListenableFuture<>(); when(template. send(anyString(), any(Message.
https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java
This example was updated to be working in the new 0.8.2.2 version. Here is the code snippet with maven dependencies:
pom.xml:
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
</dependencies>
KafkaProducerTest.java:
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;
/**
* For online documentation
* see
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
*/
public class KafkaProducerTest {
private int brokerId = 0;
private String topic = "test";
@Test
public void producerTest() throws InterruptedException {
// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);
// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port, true);
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
// create topic
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));
List<KafkaServer> servers = new ArrayList<KafkaServer>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer producer = new Producer(producerConfig);
// setup simple consumer
Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));
// send message
KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));
List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
messages.add(data);
producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.close();
// deleting zookeeper information to make sure the consumer starts from the beginning
// see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
zkClient.delete("/consumers/group0");
// starting consumer
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
if(iterator.hasNext()) {
String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
System.out.println(msg);
assertEquals("test-message", msg);
} else {
fail();
}
// cleanup
consumer.shutdown();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
Be sure to check your mvn dependency:tree for any conflicting libraries. I had to add exclusions for slf and log4j:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<classifier>test</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
Another option I'm looking into is using apache curator: Is it possible to start a zookeeper server instance in process, say for unit tests?
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.2.0-incubating</version>
<scope>test</scope>
</dependency>
TestingServer zkTestServer;
@Before
public void startZookeeper() throws Exception {
zkTestServer = new TestingServer(2181);
cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
}
@After
public void stopZookeeper() throws IOException {
cli.close();
zkTestServer.stop();
}
Have you tried mocking kafka consumer objects using a mocking framework like Mockito?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With