Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Creating Kafka topic in sarama

Tags:

sarama

Is it possible to create kafka topic in sarama? I know java API enables you do create topic but I couldn't find any information on how to do that in sarama. if it's possible, an example or explanation on which api I should use would be great thanks in advance

like image 315
Mohsen Shakiba Avatar asked May 21 '17 08:05

Mohsen Shakiba


People also ask

What is Sarama Kafka?

Package sarama is a pure Go client library for dealing with Apache Kafka (versions 0.8 and later). It includes a high-level API for easily producing and consuming messages, and a low-level API for controlling bytes on the wire when the high-level API is insufficient.

How do I turn off auto create topic in Kafka?

If you don't want to allow automatic topic creation by Kafka Connect, set the value of topic. creation. enable to false in the Kafka Connect configuration (connect-distributed.

What is topic in Kafka with example?

Kafka Topic. A Topic is a category/feed name to which records are stored and published. As said before, all Kafka records are organized into topics. Producer applications write data to topics and consumer applications read from topics.


3 Answers

Indeed, in newer versions of Sarama you can use ClusterAdmin to create topics. Below you can find the sample code:

package main

import (
    "github.com/Shopify/sarama" // Sarama 1.22.0
    "log"
)

func main() {
    brokerAddrs := []string{"localhost:9092"}
    config := sarama.NewConfig()
    config.Version = sarama.V2_1_0_0
    admin, err := sarama.NewClusterAdmin(brokerAddrs, config)
    if err != nil {
        log.Fatal("Error while creating cluster admin: ", err.Error())
    }
    defer func() { _ = admin.Close() }()
    err = admin.CreateTopic("topic.test.1", &sarama.TopicDetail{
        NumPartitions:     1,
        ReplicationFactor: 1,
    }, false)
    if err != nil {
        log.Fatal("Error while creating topic: ", err.Error())
    }
}
like image 64
Daniel Pacak Avatar answered Oct 06 '22 02:10

Daniel Pacak


EDIT : Below was an old answer which still works, but at that point the sarama admin apis were under development. Since then ClusterAdmin apis have come a long way and today should be treated as a preferred way to solve this problem. Refer to the other 2 answers below if you are looking to solve this in 2020+.


It is possible to use sarama for managing Topics in Kafka. I am writing a terraform provider for managing Kafka topics and use sarama to do heavy lifting in the backend.

You need to use the sarama.Broker apis to do this. For example

// Set broker configuration
broker := sarama.NewBroker("localhost:9092")

// Additional configurations. Check sarama doc for more info
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0

// Open broker connection with configs defined above
broker.Open(config)

// check if the connection was OK
connected, err := broker.Connected()
if err != nil {
    log.Print(err.Error())
}
log.Print(connected)

// Setup the Topic details in CreateTopicRequest struct
topic := "blah25s"
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(1)
topicDetail.ReplicationFactor = int16(1)
topicDetail.ConfigEntries = make(map[string]*string)

topicDetails := make(map[string]*sarama.TopicDetail)
topicDetails[topic] = topicDetail

request := sarama.CreateTopicsRequest{
    Timeout:      time.Second * 15,
    TopicDetails: topicDetails,
}

// Send request to Broker
response, err := broker.CreateTopics(&request)

// handle errors if any
if err != nil {
    log.Printf("%#v", &err)
}
t := response.TopicErrors
for key, val := range t {
    log.Printf("Key is %s", key)
    log.Printf("Value is %#v", val.Err.Error())
    log.Printf("Value3 is %#v", val.ErrMsg)
}
log.Printf("the response is %#v", response)

// close connection to broker
broker.Close()

You can have a look at a working code at github. Remember to start kafka broker and import all golang dependency before running the code.

like image 24
Prakhar Avatar answered Oct 06 '22 02:10

Prakhar


It is better to directly use : https://github.com/Shopify/sarama/blob/master/admin.go for this instead of directly connecting to a broker.

This handles lot of cases like:

  1. You can add multiple broker addresses for a cluster config.
  2. Identification of which broker acts as the controller is done automatically.
like image 45
Nitin K Avatar answered Oct 06 '22 00:10

Nitin K