Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka create topic from code [duplicate]

As we know Topic creation in Kafka should be handled on the the server initialization part. There we use the default script ./kafka-topics --zookeeper ..., but what if we need to create a topic dynamically?

like image 789
Andrei Nechaev Avatar asked Dec 04 '16 18:12

Andrei Nechaev


2 Answers

Fortunately, Kafka 0.10.1.0 brought us this ability. I saw these fascinating feature on the Confluence Jira board but couldn't find any documentation related to the topic, irony, isn't it?

So, I went to the source code and found the way of creating topics on the fly. Hopefully it will be helpful for some of you. Of course, if you have a better solution, please, do not hesitate to share it with us.

Ok, let's start.

/** The method propagate topics **/
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
    CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
    Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
            .collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1

    CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2

    try {
        CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3
        return response.errors().entrySet().stream()
                .filter(error -> error.getValue() == Errors.NONE)
                .map(Map.Entry::getKey)
                .collect(Collectors.toList()); // 4
    } catch (IOException e) {
        log.error(e);
    }

    return null;
}

1 we need an instance of TopicDetails, for simplicity, I'll share the same configs among all topics. Assume, that mTopics is your list of Strings of all topics you want to create.

2 Basically we want to send a request to our Kafka cluster, now we have the special class for that, - that accepts CreateTopicsRequest and timeout

3 Than we need to send the request and get the CreateTopicsResponse

    private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
    private static final short version = 0;
    private static final short correlationId = -1;

private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
        String[] comp = client.split(":");
        if (comp.length != 2) {
            throw new IllegalArgumentException("Wrong client directive");
        }
        String address = comp[0];
        int port = Integer.parseInt(comp[1]);

        RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
        header.writeTo(buffer);
        request.writeTo(buffer);

        byte byteBuf[] = buffer.array();

        byte[] resp = requestAndReceive(byteBuf, address, port);
        ByteBuffer respBuffer = ByteBuffer.wrap(resp);
        ResponseHeader.parse(respBuffer);

        return CreateTopicsResponse.parse(respBuffer);
    }

    private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
        try(Socket socket = new Socket(address, port);
            DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
            DataInputStream dis = new DataInputStream(socket.getInputStream())
        ) {
            dos.writeInt(buffer.length);
            dos.write(buffer);
            dos.flush();

            byte resp[] = new byte[dis.readInt()];
            dis.readFully(resp);

            return resp;
        } catch (IOException e) {
            log.error(e);
        }

        return new byte[0];
    }

Here is no magic at all, just sending the request, and than parsing the byte stream to the response.

4 CreateTopicsResponse has property errors, which is just a Map<String, Errors> where key is the topic name you requested. The tricky thing, it contains all topics you requested, but those with no errors has value Errors.None, that's why I'm filtering the response and return only successfully created topics.

like image 158
Andrei Nechaev Avatar answered Oct 01 '22 13:10

Andrei Nechaev


Extending Andrei Nechaev answers

With 10.2.0, the way to get an instance of CreateTopicsRequest has changed a bit. We need to use the Builder inner class to build a CreateTopicsRequest instance. Here is a code sample.

CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false);
CreateTopicsRequest request = builder.build();
like image 40
MedianMidget Avatar answered Oct 01 '22 13:10

MedianMidget