I want to mock the Confluent Kafka APIs for Consumer and Producer in GO for Unit Testing, Is there any way (process/steps/library) to mock them successfully?
I'd suggest that you do this without any mocking libraries, just in case you're getting started with unit testing and mocking. Start by abstracting kafka with your own implementation:
// producer.go
package producer
type Producer interface {
Close()
Events() chan kafka.Event
Flush(timeoutMs int) int
GetFatalError() error
GetMetadata(topic *string, allTopics bool, timeoutMs int) (*kafka.Metadata, error)
Len() int
OffsetsForTimes(times []kafka.TopicPartition, timeoutMs int) (offsets []kafka.TopicPartition, err error)
Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error
ProduceChannel() chan *kafka.Message
QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
SetOAuthBearerToken(oauthBearerToken kafka.OAuthBearerToken) error
SetOAuthBearerTokenFailure(errstr string) error
String() string
TestFatalError(code kafka.ErrorCode, str string) kafka.ErrorCode
}
type KProducer struct {}
func (k *KProducer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
return nil
}
func (k *KProducer) LastProduced() *ProducerMessage {
return nil
}
func (k *KProducer) ProducedAt(_ int) *ProducerMessage {
return nil
}
And then create a ProducerMock like this:
// producer_mock.go
package producer
import (
"github.com/confluentinc/confluent-kafka-go/kafka"
"testing"
)
type ProducerMock struct {
ProducedMessages []*ProducerMessage
ProducedCount int
}
func (p * ProducerMock) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error {
p.ProducedMessages = append(p.ProducedMessages, &ProducerMessage{
Message: msg,
})
p.ProducedCount += 1
return nil
}
func (p * ProducerMock) LastProduced() *ProducerMessage {
if len(p.ProducedMessages) == 0 {
return nil
}
return p.ProducedMessages[len(p.ProducedMessages) - 1]
}
func (p * ProducerMock) ProducedAt(idx int) *ProducerMessage {
if idx >= len(p.ProducedMessages) {
return nil
}
return p.ProducedMessages[idx]
}
func (p * ProducerMock) AssertProducedCount(t *testing.T, n int) {
if p.ProducedCount != n {
t.Errorf("ProducedCount should be equal %d but it's equal %d", n, p.ProducedCount)
}
}
And when testing, try using the ProducerMock and using the assertion methods created:
// producer_test.go
package producer
import (
"encoding/json"
"github.com/confluentinc/confluent-kafka-go/kafka"
"testing"
)
type AppEvent struct {
Foo string `json:"foo"`
}
type MyApp struct {
producer Producer
}
func (a * MyApp) sendMessage() {
event := AppEvent{
Foo: "Bar",
}
value, _ := json.Marshal(event)
topic := "topic1"
_ = a.producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: 1,
Offset: 4,
},
Value: value,
}, nil)
}
func TestProduce(t *testing.T) {
p := ProducerMock{}
app := MyApp{&p}
p.AssertProducedCount(t, 0)
app.sendMessage()
p.AssertProducedCount(t, 1)
p.LastProduced().AssertTopic(t, "topic1")
p.LastProduced().AssertPartition(t, 1)
p.LastProduced().AssertOffset(t, 4)
p.LastProduced().AssertValueEquals(t, AppEvent{
Foo: "Bar",
})
}
Finally, do the same for your consumer and you can keep expanding it to test for your specific needs...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With