Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Test Kafka Streams topology

I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

Is this possible without a real Kafka setup?

like image 736
imehl Avatar asked Jan 24 '17 10:01

imehl


People also ask

How do you test a Kafka topology?

Define the input data (aka test fixture) Send it to the mocks of the input topics ( TestInputTopic ) Read what was sent by the topology from the mocks of the output topics ( TestOutputTopic ) Validate the result, comparing it to what was expected.

What is Kafka stream topology?

Topology is a directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application. Topology can be created directly (as part of Low-Level Processor API) or indirectly using Streams DSL — High-Level Stream Processing DSL.

How do you automate a Kafka test?

You will need to have Kafka brokers and the dependencies running to make this solution to work, but nothing like a docker compose and/or some scripts to bring a environment for tests. Another way is to implement your own project with Kafka libraries and use the libraries to send and receive messages in the tests.


4 Answers

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 added official test utils. Per the Upgrade Guide:

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

From the documentation:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

See the documentation for details.


ProcessorTopologyTestDriver is available as of 0.11.0.0. It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

You will also need to add the kafka-clients test artifact:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

You can feed input into the topology as though you had actually written to one of the input topics:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

And read output topics:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Then you can assert on these results.

like image 141
Dmitry Minkovsky Avatar answered Nov 16 '22 01:11

Dmitry Minkovsky


  1. As you are asking if it is possible to test Kafka Streams application without a real Kafka setup, you might try this Mocked Streams library in Scala. Mocked Streams 1.0 is a library for Scala >= 2.11.8 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Reference: https://github.com/jpzk/mockedstreams

  2. You can also use scalatest-embedded-kafka which is a library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.10.1.1 and ZooKeeper 3.4.8.
    Reference: https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

Good luck!

like image 33
Slim Baltagi Avatar answered Nov 16 '22 02:11

Slim Baltagi


Spring kafka has support for unit testing with an embedded kafka see https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation.

Also the kafka team is working on releasing a test driver for streams https://issues.apache.org/jira/browse/KAFKA-3625.

like image 29
Jacob Botuck Avatar answered Nov 16 '22 02:11

Jacob Botuck


You can just run a single Zookeeper and broker locally to test a Kafka Streams application.

Just follow those quick start guides:

  • local ZK and broker setup: http://kafka.apache.org/quickstart
  • http://docs.confluent.io/current/streams/quickstart.html

Also check out this Kafka Streams examples (with detailed walk through instructions in the JavaDocs):

  • https://github.com/confluentinc/examples/tree/3.1.x/kafka-streams
like image 45
Matthias J. Sax Avatar answered Nov 16 '22 01:11

Matthias J. Sax