As we know Topic creation in Kafka should be handled on the the server initialization part. There we use the default script ./kafka-topics --zookeeper ..., but what if we need to create a topic dynamically?
Fortunately, Kafka 0.10.1.0 brought us this ability. I saw these fascinating feature on the Confluence Jira board but couldn't find any documentation related to the topic, irony, isn't it?
So, I went to the source code and found the way of creating topics on the fly. Hopefully it will be helpful for some of you. Of course, if you have a better solution, please, do not hesitate to share it with us.
Ok, let's start.
/** The method propagate topics **/
public List<String> propagateTopics(int partitions, short replication, int timeout) throws IOException {
CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(partitions, replication);
Map<String, CreateTopicsRequest.TopicDetails> topicConfig = mTopics.stream()
.collect(Collectors.toMap(k -> k, v -> topicDetails)); // 1
CreateTopicsRequest request = new CreateTopicsRequest(topicConfig, timeout); // 2
try {
CreateTopicsResponse response = createTopic(request, BOOTSTRAP_SERVERS_CONFIG); // 3
return response.errors().entrySet().stream()
.filter(error -> error.getValue() == Errors.NONE)
.map(Map.Entry::getKey)
.collect(Collectors.toList()); // 4
} catch (IOException e) {
log.error(e);
}
return null;
}
1we need an instance ofTopicDetails, for simplicity, I'll share the same configs among all topics. Assume, thatmTopicsis your list of Strings of all topics you want to create.
2Basically we want to send a request to our Kafka cluster, now we have the special class for that, - that acceptsCreateTopicsRequestand timeout
3Than we need to send the request and get theCreateTopicsResponse
private static final short apiKey = ApiKeys.CREATE_TOPICS.id;
private static final short version = 0;
private static final short correlationId = -1;
private static CreateTopicsResponse createTopic(CreateTopicsRequest request, String client) throws IllegalArgumentException, IOException {
String[] comp = client.split(":");
if (comp.length != 2) {
throw new IllegalArgumentException("Wrong client directive");
}
String address = comp[0];
int port = Integer.parseInt(comp[1]);
RequestHeader header = new RequestHeader(apiKey, version, client, correlationId);
ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + request.sizeOf());
header.writeTo(buffer);
request.writeTo(buffer);
byte byteBuf[] = buffer.array();
byte[] resp = requestAndReceive(byteBuf, address, port);
ByteBuffer respBuffer = ByteBuffer.wrap(resp);
ResponseHeader.parse(respBuffer);
return CreateTopicsResponse.parse(respBuffer);
}
private static byte[] requestAndReceive(byte[] buffer, String address, int port) throws IOException {
try(Socket socket = new Socket(address, port);
DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
DataInputStream dis = new DataInputStream(socket.getInputStream())
) {
dos.writeInt(buffer.length);
dos.write(buffer);
dos.flush();
byte resp[] = new byte[dis.readInt()];
dis.readFully(resp);
return resp;
} catch (IOException e) {
log.error(e);
}
return new byte[0];
}
Here is no magic at all, just sending the request, and than parsing the byte stream to the response.
4CreateTopicsResponsehas propertyerrors, which is just aMap<String, Errors>wherekeyis the topic name you requested. The tricky thing, it contains all topics you requested, but those with no errors has valueErrors.None, that's why I'm filtering the response and return only successfully created topics.
Extending Andrei Nechaev answers
With 10.2.0, the way to get an instance of CreateTopicsRequest has changed a bit. We need to use the Builder inner class to build a CreateTopicsRequest instance. Here is a code sample.
CreateTopicsRequest.Builder builder = new CreateTopicsRequest.Builder(topicConfig, timeout, false);
CreateTopicsRequest request = builder.build();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With