Can I get an example of creating a topic using segmentio's kafka-go?
I have tried creating a topic as below:
c, _ := kafka.Dial("tcp", "host:port")
kt := kafka.TopicConfig{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}
e := c.CreateTopics(kt)
But this will work only if the given host:port is Kafka Leader. If the host:port is not Kafka Leader, then I will be getting this error:
Not Controller: this is not the correct controller for this cluster*
What is the right way of passing clusters address for creating a topic?
Kafka Segmentio: github.com/segmentio/kafka-go
Like shmsr said - you need to get the Leader connection to create topics. You can do that in the following way:
conn, err := kafka.Dial("tcp", "host:port")
if err != nil {
panic(err.Error())
}
defer conn.Close()
controller, err := conn.Controller()
if err != nil {
panic(err.Error())
}
controllerConn, err := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err.Error())
}
defer controllerConn.Close()
topicConfigs := []kafka.TopicConfig{{Topic: "sometopic", NumPartitions: 1, ReplicationFactor: 1}}
err = controllerConn.CreateTopics(topicConfigs...)
if err != nil {
panic(err.Error())
}
This is what you need:
func (c *Conn) Controller() (broker Broker, err error)
// Controller requests kafka for the current controller and returns its URL
When you open the connection in your code using Dial, you are randomly picking one of the brokers in the cluster. Hence, you may/ may not end up on the actual Kafka controller. A simple lookup for the controller and opening a new connection should be helpful.
https://pkg.go.dev/github.com/segmentio/kafka-go?tab=doc#Conn.Controller
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With