Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Mocking Kafka APIs for Unit Testing

I want to mock the Confluent Kafka APIs for Consumer and Producer in GO for Unit Testing, Is there any way (process/steps/library) to mock them successfully?

like image 838
Priyadarshan Mohanty Avatar asked Mar 01 '26 18:03

Priyadarshan Mohanty


1 Answers

I'd suggest that you do this without any mocking libraries, just in case you're getting started with unit testing and mocking. Start by abstracting kafka with your own implementation:

// producer.go
package producer

type Producer interface {
    Close()
    Events() chan kafka.Event
    Flush(timeoutMs int) int
    GetFatalError() error
    GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
    Len() int
    OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
    Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
    ProduceChannel() chan *kafka.Message
    QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
    SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
    SetOAuthBearerTokenFailure(errstr string) error
    String() string
    TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
}

type KProducer struct {}

func (k *KProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
    return nil
}

func (k *KProducer) LastProduced() *ProducerMessage {
    return nil
}

func (k *KProducer) ProducedAt(_ int) *ProducerMessage {
    return nil
}

And then create a ProducerMock like this:

// producer_mock.go
package producer

import (
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "testing"
)

type ProducerMock struct {
    ProducedMessages []*ProducerMessage
    ProducedCount int
}

func (p * ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
    p.ProducedMessages = append(p.ProducedMessages, &ProducerMessage{
        Message: msg,
    })
    p.ProducedCount += 1
    return nil
}

func (p * ProducerMock) LastProduced() *ProducerMessage {
    if len(p.ProducedMessages) == 0 {
        return nil
    }
    return p.ProducedMessages[len(p.ProducedMessages) - 1]
}

func (p * ProducerMock) ProducedAt(idx int) *ProducerMessage {
    if idx >= len(p.ProducedMessages) {
        return nil
    }
    return p.ProducedMessages[idx]
}

func (p * ProducerMock) AssertProducedCount(t *testing.T, n int) {
    if p.ProducedCount != n {
        t.Errorf("ProducedCount should be equal %d but it's equal %d", n, p.ProducedCount)
    }
}

And when testing, try using the ProducerMock and using the assertion methods created:

// producer_test.go
package producer

import (
    "encoding/json"
    "github.com/confluentinc/confluent-kafka-go/kafka"
    "testing"
)

type AppEvent struct {
    Foo string `json:"foo"`
}

type MyApp struct {
    producer Producer
}

func (a * MyApp) sendMessage() {
    event := AppEvent{
        Foo: "Bar",
    }
    value, _ := json.Marshal(event)
    topic := "topic1"
    _ = a.producer.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{
            Topic: &topic,
            Partition: 1,
            Offset: 4,
        },
        Value:          value,
    }, nil)
}

func TestProduce(t *testing.T) {
    p := ProducerMock{}
    app := MyApp{&p}
    p.AssertProducedCount(t, 0)
    app.sendMessage()
    p.AssertProducedCount(t, 1)
    p.LastProduced().AssertTopic(t, "topic1")
    p.LastProduced().AssertPartition(t, 1)
    p.LastProduced().AssertOffset(t, 4)
    p.LastProduced().AssertValueEquals(t, AppEvent{
        Foo: "Bar",
    })
}

Finally, do the same for your consumer and you can keep expanding it to test for your specific needs...

like image 90
Fabio Martins Avatar answered Mar 04 '26 19:03

Fabio Martins



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!