Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to create Kafka topic using Segmentio's kafka-go?

Can I get an example of creating a topic using segmentio's kafka-go?

I have tried creating a topic as below:

c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)

But this will work only if the given host:port is Kafka Leader. If the host:port is not Kafka Leader, then I will be getting this error:

Not Controller: this is not the correct controller for this cluster*

What is the right way of passing clusters address for creating a topic?

Kafka Segmentio: github.com/segmentio/kafka-go

like image 888
prabhu Avatar asked Nov 15 '25 13:11

prabhu


2 Answers

Like shmsr said - you need to get the Leader connection to create topics. You can do that in the following way:

conn, err := kafka.Dial("tcp", "host:port")
if err != nil {
    panic(err.Error())
}
defer conn.Close()

controller, err := conn.Controller()
if err != nil {
    panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
    panic(err.Error())
}
defer controllerConn.Close()

topicConfigs := []kafka.TopicConfig{{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}}

err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
    panic(err.Error())
}
like image 186
Toivo Lainevool Avatar answered Nov 17 '25 08:11

Toivo Lainevool


This is what you need:

func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL

When you open the connection in your code using Dial, you are randomly picking one of the brokers in the cluster. Hence, you may/ may not end up on the actual Kafka controller. A simple lookup for the controller and opening a new connection should be helpful.

https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller

like image 24
shmsr Avatar answered Nov 17 '25 09:11

shmsr