Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can do Functional tests for Kafka Streams with Avro (schemaRegistry)?

  • A brief explanation of what I want to achieve: I want to do functional tests for a kafka stream topology (using TopologyTestDriver) for avro records.

  • Issues: Can't "mock" schemaRegistry to automate the schema publishing/reading

What I tried so far is use MockSchemaRegistryClient to try to mock the schemaRegistry, but I don't know how to link it to the Avro Serde.

Code

public class SyncronizerIntegrationTest {


    private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

    MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


    @Test
    void integrationTest() throws IOException, RestClientException {


        Properties props = new Properties();
        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
        props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
        StreamsBuilder kStreamBuilder = new StreamsBuilder();
        Serde<Tracking> avroSerde = getAvroSerde();
        mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


        KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
                "topic",
                Consumed.with(Serdes.String(), avroSerde));

        unmappedOrdersStream
                .filter((k, v) -> v != null).to("ouput");

        Topology topology = kStreamBuilder.build();
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

        testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

    }
}

AvroSerde method

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

    // Configure Avro ser/des
    final Map<String,String> avroSerdeConfig = new HashMap<>();
    avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

    final Serde<T> avroSerde = new SpecificAvroSerde<>();
    avroSerde.configure(avroSerdeConfig, false); // `false` for record values
    return avroSerde;
}

If I run the test without testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking())); it works well (looks like everything is properly settled)

But

When I try to insert data(pipeInput), it throws the following exception: The object "Tracking" is full filled.

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
    at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

Edited, I didn't deleted this, for "history log" to provide the path followed.

like image 348
Ramon jansen gomez Avatar asked Dec 23 '22 03:12

Ramon jansen gomez


1 Answers

Confluent provides a plethora of example code for testing Kafka (Streams) alongside the Schema Registry.

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

Most importantly, mocking isn't a complete integration test - starting an actual Kafka broker with an in memory schema registry is.

In the above code, see

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

And

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());
like image 193
OneCricketeer Avatar answered Feb 15 '23 10:02

OneCricketeer