Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Kafka-Go, why am I seeing what appears to be batching reads/writes? Is there a config I am missing?

I'm going to be switching from RabbitMQ to Kafka. This is just a simple spike to see how Kafka operates. I'm not sure if there are settings that I am missing, if it is my code, if it is Kafka-Go, or if this is expected Kafka behavior.

I've tried adjusting the BatchSize as well as the BatchTimeout but neither have had an impact.

The code below creates a topic with 6 partitions and a replication factor of 3. It then produces an incrementing message every 100ms. It launches 6 consumers, one for each partition. Both reading and writing are performed in go routines.

In the log below, it goes 7 seconds without receiving a message and then receives bursts. I'm using Confluent's platform so I recognize that there will be some network latency but not to the degree that I'm seeing.

package main

import (
    "context"
    "crypto/tls"
    "crypto/x509"
    "fmt"
    "log"
    "net"
    "strconv"
    "time"

    kafka "github.com/segmentio/kafka-go"
    "github.com/segmentio/kafka-go/sasl/plain"
)

func newDialer(clientID, username, password string) *kafka.Dialer {
    mechanism := plain.Mechanism{
        Username: username,
        Password: password,
    }

    rootCAs, _ := x509.SystemCertPool()
    if rootCAs == nil {
        rootCAs = x509.NewCertPool()
    }

    return &kafka.Dialer{
        Timeout:       10 * time.Second,
        DualStack:     true,
        ClientID:      clientID,
        SASLMechanism: mechanism,
        TLS: &tls.Config{
            InsecureSkipVerify: false,
            RootCAs:            rootCAs,
        },
    }
}

func createTopic(url string, topic string, dialer *kafka.Dialer) {
    conn, err := dialer.Dial("tcp", url)
    if err != nil {
        panic(err.Error())
    }
    defer conn.Close()

    controller, err := conn.Controller()
    if err != nil {
        panic(err.Error())
    }

    var controllerConn *kafka.Conn
    controllerConn, err = dialer.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
    if err != nil {
        panic(err.Error())
    }
    defer controllerConn.Close()

    topicConfigs := []kafka.TopicConfig{
        {
            Topic:             topic,
            NumPartitions:     6,
            ReplicationFactor: 3,
        },
    }

    err = controllerConn.CreateTopics(topicConfigs...)
    if err != nil {
        panic(err.Error())
    }

}

func newWriter(url string, topic string, dialer *kafka.Dialer) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:      []string{url},
        Topic:        topic,
        Balancer:     &kafka.CRC32Balancer{},
        Dialer:       dialer,
        BatchSize:    10,
        BatchTimeout: 1 * time.Millisecond,
    })
}

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{url},
        Topic:     topic,
        Dialer:    dialer,
        Partition: partition,
    })
}

func read(url string, topic string, dialer *kafka.Dialer, partition int) {

    reader := newReader(url, topic, partition, dialer)
    defer reader.Close()
    for {
        msg, err := reader.ReadMessage(context.Background())
        if err != nil {
            panic(err)
        }
        log.Printf("rec%d:\t%s\n", partition, msg.Value)
    }
}
func write(url string, topic string, dialer *kafka.Dialer) {
    writer := newWriter(url, topic, dialer)
    defer writer.Close()
    for i := 0; ; i++ {
        v := []byte("V" + strconv.Itoa(i))
        log.Printf("send:\t%s\n", v)
        msg := kafka.Message{ Key: v, Value: v }
        err := writer.WriteMessages(context.Background(), msg)
        if err != nil {
            fmt.Println(err)
        }
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    url := "_______.______.___.confluent.cloud:9092"
    topic := "test"
    username := "________________"
    password := "________________"
    clientID := "________________"
    dialer := newDialer(clientID, username, password)
    ctx := context.Background()
    createTopic(url, topic, dialer)
    for i := 0; i < 6; i++ {
        go read(url, topic, dialer, i)
    }

    go write(url, topic, dialer)
    <-ctx.Done()
}

Which is logging the below.

2020/11/02 23:19:22 send:       V0
2020/11/02 23:19:23 send:       V1
2020/11/02 23:19:23 send:       V2
2020/11/02 23:19:23 send:       V3
2020/11/02 23:19:24 send:       V4
2020/11/02 23:19:24 send:       V5
2020/11/02 23:19:24 send:       V6
2020/11/02 23:19:25 send:       V7
2020/11/02 23:19:25 send:       V8
2020/11/02 23:19:25 send:       V9
2020/11/02 23:19:25 send:       V10
2020/11/02 23:19:26 send:       V11
2020/11/02 23:19:26 send:       V12
2020/11/02 23:19:26 send:       V13
2020/11/02 23:19:26 send:       V14
2020/11/02 23:19:26 send:       V15
2020/11/02 23:19:27 send:       V16
2020/11/02 23:19:27 send:       V17
2020/11/02 23:19:27 send:       V18
2020/11/02 23:19:27 send:       V19
2020/11/02 23:19:28 send:       V20
2020/11/02 23:19:29 send:       V21
2020/11/02 23:19:29 send:       V22
2020/11/02 23:19:29 send:       V23
2020/11/02 23:19:29 send:       V24
2020/11/02 23:19:29 send:       V25
2020/11/02 23:19:30 send:       V26
2020/11/02 23:19:30 send:       V27
2020/11/02 23:19:30 send:       V28
2020/11/02 23:19:30 send:       V29
2020/11/02 23:19:31 send:       V30
2020/11/02 23:19:31 send:       V31
2020/11/02 23:19:31 send:       V32
2020/11/02 23:19:32 send:       V33
2020/11/02 23:19:32 send:       V34
2020/11/02 23:19:32 rec3:       V8
2020/11/02 23:19:32 rec3:       V14
2020/11/02 23:19:32 rec3:       V15
2020/11/02 23:19:32 rec3:       V16
2020/11/02 23:19:32 rec3:       V17
2020/11/02 23:19:32 rec3:       V20
2020/11/02 23:19:32 rec3:       V21
2020/11/02 23:19:32 rec3:       V23
2020/11/02 23:19:32 rec3:       V29
2020/11/02 23:19:32 rec1:       V0
2020/11/02 23:19:32 rec1:       V9
2020/11/02 23:19:32 rec1:       V22
2020/11/02 23:19:32 rec1:       V28
2020/11/02 23:19:32 rec4:       V4
2020/11/02 23:19:32 rec4:       V5
2020/11/02 23:19:32 rec4:       V7
2020/11/02 23:19:32 rec4:       V10
2020/11/02 23:19:32 rec4:       V11
2020/11/02 23:19:32 rec4:       V12
2020/11/02 23:19:32 rec4:       V18
2020/11/02 23:19:32 rec4:       V24
2020/11/02 23:19:32 rec4:       V25
2020/11/02 23:19:32 rec4:       V30
2020/11/02 23:19:32 rec4:       V31
2020/11/02 23:19:32 send:       V35
2020/11/02 23:19:32 rec5:       V1
2020/11/02 23:19:32 rec5:       V2
2020/11/02 23:19:32 rec5:       V3
2020/11/02 23:19:32 rec5:       V34
2020/11/02 23:19:32 rec2:       V6
2020/11/02 23:19:32 rec2:       V13
2020/11/02 23:19:32 rec2:       V26
2020/11/02 23:19:32 rec2:       V33
2020/11/02 23:19:32 send:       V36
2020/11/02 23:19:33 send:       V37
2020/11/02 23:19:33 send:       V38
2020/11/02 23:19:33 send:       V39
2020/11/02 23:19:33 send:       V40
2020/11/02 23:19:33 send:       V41
2020/11/02 23:19:33 rec0:       V19
2020/11/02 23:19:33 rec0:       V27
2020/11/02 23:19:33 rec0:       V32
2020/11/02 23:19:34 send:       V42
2020/11/02 23:19:34 send:       V43
2020/11/02 23:19:34 send:       V44
2020/11/02 23:19:34 send:       V45
2020/11/02 23:19:34 send:       V46
2020/11/02 23:19:35 send:       V47
2020/11/02 23:19:35 send:       V48
2020/11/02 23:19:35 send:       V49
2020/11/02 23:19:35 send:       V50
2020/11/02 23:19:35 send:       V51
2020/11/02 23:19:35 send:       V52
2020/11/02 23:19:36 send:       V53
2020/11/02 23:19:36 send:       V54
2020/11/02 23:19:36 send:       V55
2020/11/02 23:19:36 send:       V56
2020/11/02 23:19:36 send:       V57
2020/11/02 23:19:37 send:       V58
2020/11/02 23:19:37 send:       V59
2020/11/02 23:19:37 send:       V60
2020/11/02 23:19:38 send:       V61
2020/11/02 23:19:38 send:       V62
2020/11/02 23:19:38 send:       V63
2020/11/02 23:19:38 send:       V64
2020/11/02 23:19:38 send:       V65
2020/11/02 23:19:39 send:       V66
2020/11/02 23:19:39 send:       V67
2020/11/02 23:19:39 send:       V68
2020/11/02 23:19:40 send:       V69
2020/11/02 23:19:40 send:       V70
2020/11/02 23:19:40 send:       V71
2020/11/02 23:19:40 send:       V72
2020/11/02 23:19:40 send:       V73
2020/11/02 23:19:40 send:       V74
2020/11/02 23:19:41 send:       V75
2020/11/02 23:19:41 send:       V76
2020/11/02 23:19:41 rec1:       V41
2020/11/02 23:19:41 rec1:       V56
2020/11/02 23:19:41 rec1:       V68
2020/11/02 23:19:41 rec1:       V74
2020/11/02 23:19:41 rec1:       V75
2020/11/02 23:19:41 rec1:       V76
2020/11/02 23:19:41 rec3:       V37
2020/11/02 23:19:41 rec3:       V40
2020/11/02 23:19:41 rec3:       V42
2020/11/02 23:19:41 rec3:       V48
2020/11/02 23:19:41 rec3:       V55
2020/11/02 23:19:41 rec3:       V57
2020/11/02 23:19:41 rec3:       V60
2020/11/02 23:19:41 rec3:       V61
2020/11/02 23:19:41 rec3:       V62
2020/11/02 23:19:41 send:       V77
2020/11/02 23:19:41 rec4:       V38
2020/11/02 23:19:41 rec4:       V39
2020/11/02 23:19:41 rec4:       V45
2020/11/02 23:19:41 rec4:       V46
2020/11/02 23:19:41 rec4:       V47
2020/11/02 23:19:41 rec4:       V53
2020/11/02 23:19:41 rec4:       V59
2020/11/02 23:19:41 rec4:       V70
2020/11/02 23:19:41 rec4:       V71
2020/11/02 23:19:41 rec4:       V73
2020/11/02 23:19:41 rec5:       V35
2020/11/02 23:19:41 rec5:       V36
2020/11/02 23:19:41 rec5:       V43
2020/11/02 23:19:41 rec5:       V49
2020/11/02 23:19:41 rec5:       V54
2020/11/02 23:19:41 rec5:       V63
2020/11/02 23:19:41 rec5:       V69
2020/11/02 23:19:41 rec5:       V77
2020/11/02 23:19:41 send:       V78
2020/11/02 23:19:41 rec2:       V44
2020/11/02 23:19:41 rec2:       V50
2020/11/02 23:19:41 rec2:       V51
2020/11/02 23:19:41 rec2:       V64
2020/11/02 23:19:41 rec2:       V65
2020/11/02 23:19:41 rec2:       V66
2020/11/02 23:19:41 rec2:       V72
2020/11/02 23:19:41 send:       V79
2020/11/02 23:19:42 send:       V80
2020/11/02 23:19:42 send:       V81
2020/11/02 23:19:42 send:       V82
2020/11/02 23:19:42 send:       V83
2020/11/02 23:19:42 send:       V84
2020/11/02 23:19:43 send:       V85
2020/11/02 23:19:43 rec0:       V52
2020/11/02 23:19:43 rec0:       V58
2020/11/02 23:19:43 rec0:       V67
2020/11/02 23:19:43 send:       V86

Any advice would be greatly appreciated. Thanks!

Edit:

The buffering is definitely just happening with Kafka-Go. Sarama does not encounter the same behavior:

package main

import (
    "context"
    "fmt"

    "github.com/Shopify/sarama"

    "crypto/tls"
    "crypto/x509"

    "log"
    "strings"
    "time"
)

var (
    broker   = "___-_____.us-east1.gcp.confluent.cloud:9092"
    brokers  = []string{broker}
    clientID = "___________"
    username = "___________"
    password = "___________"
    topic    = "sarama"
)

func main() {

    log.Printf("Kafka brokers: %s", strings.Join(brokers, ", "))
    ctx := context.Background()
    sync := newSyncProducer()
    // accessLog := newAsyncProducer()

    createTopic(topic)

    go func() {
        for i := 0; ; i++ {
            v := sarama.StringEncoder(fmt.Sprintf("V%d", i))

            p, o, err := sync.SendMessage(&sarama.ProducerMessage{
                Topic: topic,
                Value: v,
            })
            if err != nil {
                panic(err)
            }
            fmt.Printf("sent\t\t%v\tp: %d\toffset: %d\t\n", v, p, o)
            time.Sleep(100 * time.Millisecond)
        }
    }()
    ps := []sarama.PartitionConsumer{}
    offset := int64(0)

    loop := func(msgs <-chan *sarama.ConsumerMessage) {
        for msg := range msgs {
            fmt.Printf("recv:\t\t%s\tp: %d\toffset: %d\n", msg.Value, msg.Partition, msg.Offset)
        }
    }

    for i := 0; i < 6; i++ {
        ps = append(ps, createPartitionConsumer(topic, int32(i), offset))
    }

    for _, p := range ps {
        go loop(p.Messages())
    }
    <-ctx.Done()

}

func createPartitionConsumer(topic string, partition int32, offset int64) sarama.PartitionConsumer {
    config := baseConfig()
    c, err := sarama.NewConsumer(brokers, config)

    if err != nil {
        panic(err)
    }
    p, err := c.ConsumePartition(topic, partition, offset)
    if err != nil {
        panic(err)
    }
    return p
}

func createTopic(topic string) {
    config := baseConfig()
    admin, err := sarama.NewClusterAdmin(brokers, config)
    if err != nil {
        log.Fatal("Error while creating cluster admin: ", err.Error())
    }
    defer func() { _ = admin.Close() }()
    err = admin.CreateTopic(topic, &sarama.TopicDetail{
        NumPartitions:     6,
        ReplicationFactor: 3,
    }, false)
    if err != nil {
        log.Println("Error while creating topic: ", err.Error())
    }
}

func baseConfig() *sarama.Config {
    rootCAs, _ := x509.SystemCertPool()
    if rootCAs == nil {
        rootCAs = x509.NewCertPool()
    }

    config := sarama.NewConfig()
    config.Version = sarama.MaxVersion
    config.Net.TLS.Enable = true
    config.Net.TLS.Config = &tls.Config{
        RootCAs:            rootCAs,
        InsecureSkipVerify: false,
    }

    config.ClientID = clientID
    config.Net.SASL.Enable = true
    config.Net.SASL.Password = password
    config.Net.SASL.User = username
    return config
}

func newSyncProducer() sarama.SyncProducer {

    config := baseConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Retry.Max = 10
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(brokers, config)
    if err != nil {
        log.Fatalln("Failed to start Sarama producer:", err)
    }

    return producer
}

In fact, in some cases it actually receives before the sent is confirmed which makes me wonder if there is internal messaging occurring and if so, if I should care...

recv:           V1176   p: 1    offset: 355
recv:           V1177   p: 2    offset: 363
send:           V1177   p: 2    offset: 363
send:           V1178   p: 5    offset: 377
recv:           V1178   p: 5    offset: 377
recv:           V1179   p: 1    offset: 356
send:           V1179   p: 1    offset: 356
send:           V1180   p: 1    offset: 357
recv:           V1180   p: 1    offset: 357
recv:           V1181   p: 1    offset: 358
send:           V1181   p: 1    offset: 358
send:           V1182   p: 4    offset: 393
recv:           V1182   p: 4    offset: 393
send:           V1183   p: 4    offset: 394
recv:           V1183   p: 4    offset: 394
send:           V1184   p: 3    offset: 358
recv:           V1184   p: 3    offset: 358
send:           V1185   p: 2    offset: 364
recv:           V1185   p: 2    offset: 364
send:           V1186   p: 3    offset: 359
recv:           V1186   p: 3    offset: 359
recv:           V1187   p: 3    offset: 360
send:           V1187   p: 3    offset: 360
send:           V1188   p: 5    offset: 378
recv:           V1188   p: 5    offset: 378
send:           V1189   p: 2    offset: 365
recv:           V1189   p: 2    offset: 365
recv:           V1190   p: 4    offset: 395
send:           V1190   p: 4    offset: 395
send:           V1191   p: 1    offset: 359
recv:           V1191   p: 1    offset: 359
send:           V1192   p: 4    offset: 396
recv:           V1192   p: 4    offset: 396
send:           V1193   p: 0    offset: 431
recv:           V1193   p: 0    offset: 431
send:           V1194   p: 4    offset: 397
recv:           V1194   p: 4    offset: 397
recv:           V1195   p: 2    offset: 366
send:           V1195   p: 2    offset: 366
send:           V1196   p: 3    offset: 361
recv:           V1196   p: 3    offset: 361
like image 504
Chance Avatar asked Nov 03 '20 04:11

Chance


People also ask

Is Kafka written in Go?

GitHub - travisjeffery/jocko: Kafka implemented in Golang with built-in coordination (No ZK dep, single binary install, Cloud Native)

How do I know if a Kafka topic has messages?

You can count the number of messages in a Kafka topic simply by consuming the entire topic and counting how many messages are read. To do this from the commandline you can use the kcat tool which can act as a consumer (and producer) and is built around the Unix philosophy of pipelines.

How do you check Kafka consumer is running?

6 Answers. Show activity on this post. You can use consumer. assignment() , it will return set of partitions and verify whether all of the partitions are assigned which are available for that topic.


1 Answers

You need to change ReaderConfig.MinBytes, otherwise segmentio/kafka-go will set it to 1e6 = 1 MB, and in that case, Kafka will wait for that much data to accumulate before answering the request.

func newReader(url string, topic string, partition int, dialer *kafka.Dialer) *kafka.Reader {

    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:   []string{url},
        Topic:     topic,
        Dialer:    dialer,
        Partition: partition,
        MinBytes:  1,         // same value of Shopify/sarama 
        MaxBytes:  57671680,
    })
}

On the other hand, the default for shopify/sarama is 1 byte.

References:

  • segmentio/kafka-go
  • Shopify/sarama
like image 58
pierDipi Avatar answered Sep 29 '22 05:09

pierDipi