Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Kafka Streams Testing : java.util.NoSuchElementException: Uninitialized topic: "output_topic_name"

I've written a test class for kafka stream application as per https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html , the code for which is

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String test_dummy = "dummy:1234";

@Before
public void setup() {
    Topology topology = new Topology();

    topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);

    topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);

    topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
            , ProcessRouter.class.getSimpleName());

    topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
            , WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
            , WorkforceVisit.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);       
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());

    testDriver = new TopologyTestDriver(topology, properties);

    //setup test topics
    inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
    outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());

}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    Object b =  outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);

}

where I used EventSerde class to serialize and deserialize the value.

When I run this code it gives the error java.util.NoSuchElementException: Uninitialized topic: processed_events with the following stacktrace:

java.util.NoSuchElementException: Uninitialized topic: processed_events

at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

As you can see i have initialized both input and output topics. I have also debugged the code and the error occurs when i read the value from output topic

outputTopic.readValue();

I don't understand what else i should do to initialize the outputTopic. Can anyone help me with this problem?

I am using apache kafka-streams-test-utils 2.4.0 and kafka-streams 2.4.0

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.4.0</version>
        <scope>test</scope>
    </dependency>
like image 949
Arbaz Sheikh Avatar asked Jan 30 '20 14:01

Arbaz Sheikh


People also ask

What is the difference between Kafka topic and Kafka stream?

The topic is the most important abstraction provided by Kafka: it is a category or feed name to which data is published by producers. Every topic in Kafka is split into one or more partitions. Kafka partitions data for storing, transporting, and replicating it. Kafka Streams partitions data for processing it.

What is difference between topic and stream?

A stream is a flow of data, whether it is from a single topic or collection of topics. There is also a method with stream(Collection<String> topics) which means that a stream is not just confined to a single topic. When topic gives us the stream of events, what is the need for us to create stream from a topic?

What is a Kafka stream?

Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka® cluster. It combines the simplicity of writing and deploying standard Java and Scala applications on the client side with the benefits of Kafka's server-side cluster technology.


1 Answers

To avoid/overcome this exception, you need to check if your output topic is not empty before trying to read from it.

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    assert(outputTopic.isEmpty(), false);
    Object b = outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);
}
like image 175
forevergenin Avatar answered Oct 13 '22 00:10

forevergenin