I have a set of Kafka broker instances running as a cluster. I have a client that is producing data to Kafka:
props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");
When we monitor using tcpdump, I can see that only the connections to broker1 and broker2 are ESTABLISHED while for the broker3, there is no connection from my producer. I have a single topic with just one partition.
My questions:
How is the relation between number of brokers and topic partitions? Should I always have number of brokers = number of partitons?
Why in my case, I'm not able to connect to broker3? or atleast my network monitoring does not show that a connection from my Producer is established with broker3?
It would be great if I could get some deeper insight into how the connection to the brokers work from a Producer stand point.
Obviously, your producer does not need to connect to broker3
:)
I'll try to explain you what happens when you are producing data to Kafka:
foo
with 2 partitions, replication factor 2. Quite simple example, yet could be a real case for someone.metadata.broker.list
(or bootstrap.servers
in new producer) configured to these brokers. Worth mentioning, you don't necessarily have to specify all the brokers in your cluster, in fact you can specify only 1 of them and it will still work. I'll explain this in a bit too.foo
using your producer.foo
and how many partitions does your foo
topic have. As this is the first send to the producer, local cache contains nothing.TopicMetadataRequest
to each broker in metadata.broker.list
sequentially until first successful response. That's why I mentioned 1 broker in that list would work as long as it's alive.TopicMetadataResponse
will contain the information about requested topics, in your case it's foo
and brokers in the cluster. Basically, this response contains the following:
TopicMetadataResponse
your producer builds up its local cache and now knows exactly that the request for topic foo
partition 0
should go to broker X.linger.ms
timeout passes, your producer flushes the batch to the broker. By "flushes" I mean "opens a new connection to a broker or reuses an existing one, and sends the ProduceRequest
".The producer does not need to open unnecessary connections to all brokers, as the topic you are producing to may not be serviced by some brokers, and your cluster could be quite large. Imagine a 1000 broker cluster with lots of topics, but one of topics has just one partition - you only need that one connection, not 1000.
In your particular case I'm not 100% sure why you have 2 open connections to brokers, if you have just a single partition, but I assume one connection was opened during metadata discovery and was cached for reusing, and the second one is the actual broker connection to produce data. However, I might be wrong in this case.
But anyway, there is no need at all to have a connection for the third broker.
Regarding your question about "Should I always have number of brokers = number of partitons?" the answer is most likely no. If you explain what you are trying to achieve, maybe I'll be able to point you to the right direction, but this is too broad to explain in general. I recommend reading this to clarify things.
UPD to answer the question in comment:
Metadata cache is updated in 2 cases:
If producer fails to communicate with broker for any reason - this includes the case when the broker is not reachable at all and when broker responds with an error (like "I'm not leader for this partition anymore, go away")
If no failures happen, the client still refreshes metadata every metadata.max.age.ms
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43) to discover new brokers and partitions itself.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With