I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.
Is this possible without a real Kafka setup?
Define the input data (aka test fixture) Send it to the mocks of the input topics ( TestInputTopic ) Read what was sent by the topology from the mocks of the output topics ( TestOutputTopic ) Validate the result, comparing it to what was expected.
Topology is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application. Topology can be created directly (as part of Low-Level Processor API) or indirectly using Streams DSL — High-Level Stream Processing DSL.
You will need to have Kafka brokers and the dependencies running to make this solution to work, but nothing like a docker compose and/or some scripts to bring a environment for tests. Another way is to implement your own project with Kafka libraries and use the libraries to send and receive messages in the tests.
Update Kafka 1.1.0 (released 23-Mar-2018):
KIP-247 added official test utils. Per the Upgrade Guide:
There is a new artifact
kafka-streams-test-utils
providing aTopologyTestDriver
,ConsumerRecordFactory
, andOutputVerifier
class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.
From the documentation:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams-test-utils</artifactId>
<version>1.1.0</version>
<scope>test</scope>
</dependency>
The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:
// Create your topology
Topology topology = new Topology();
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
testDriver.pipe(factory.create("key", 42L));
// Verify output
ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());
See the documentation for details.
ProcessorTopologyTestDriver
is available as of 0.11.0.0. It is available in the kafka-streams
test artifact (specified with <classifier>test</classifier>
in Maven):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
You will also need to add the kafka-clients
test artifact:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver
:
StringSerializer strSerializer = new StringSerializer();
StringDeserializer strDeserializer = new StringDeserializer();
Properties props = new Properties();
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
StreamsConfig config = new StreamsConfig(props);
TopologyBuilder builder = ...
ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);
You can feed input into the topology as though you had actually written to one of the input topics:
driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);
And read output topics:
ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);
Then you can assert on these results.
As you are asking if it is possible to test Kafka Streams application without a real Kafka setup, you might try this Mocked Streams library in Scala. Mocked Streams 1.0 is a library for Scala >= 2.11.8 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Reference: https://github.com/jpzk/mockedstreams
You can also use scalatest-embedded-kafka which is a library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.10.1.1 and ZooKeeper 3.4.8.
Reference: https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams
Good luck!
Spring kafka has support for unit testing with an embedded kafka see https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation.
Also the kafka team is working on releasing a test driver for streams https://issues.apache.org/jira/browse/KAFKA-3625.
You can just run a single Zookeeper and broker locally to test a Kafka Streams application.
Just follow those quick start guides:
Also check out this Kafka Streams examples (with detailed walk through instructions in the JavaDocs):
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With