Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Kafka Producer Broker Connection

Tags:

apache-kafka

I have a set of Kafka broker instances running as a cluster. I have a client that is producing data to Kafka:

props.put("metadata.broker.list", "broker1:9092,broker2:9092,broker3:9092");

When we monitor using tcpdump, I can see that only the connections to broker1 and broker2 are ESTABLISHED while for the broker3, there is no connection from my producer. I have a single topic with just one partition.

My questions:

  1. How is the relation between number of brokers and topic partitions? Should I always have number of brokers = number of partitons?

  2. Why in my case, I'm not able to connect to broker3? or atleast my network monitoring does not show that a connection from my Producer is established with broker3?

It would be great if I could get some deeper insight into how the connection to the brokers work from a Producer stand point.

like image 940
joesan Avatar asked Aug 11 '16 10:08

joesan


1 Answers

Obviously, your producer does not need to connect to broker3 :)

I'll try to explain you what happens when you are producing data to Kafka:

  1. You spin up some brokers, let's say 3, then create some topic foo with 2 partitions, replication factor 2. Quite simple example, yet could be a real case for someone.
  2. You create a producer with metadata.broker.list (or bootstrap.servers in new producer) configured to these brokers. Worth mentioning, you don't necessarily have to specify all the brokers in your cluster, in fact you can specify only 1 of them and it will still work. I'll explain this in a bit too.
  3. You send a message to topic foo using your producer.
  4. The producer looks up its local metadata cache to see what brokers are leaders for each partition of topic foo and how many partitions does your foo topic have. As this is the first send to the producer, local cache contains nothing.
  5. Producer sends a TopicMetadataRequest to each broker in metadata.broker.list sequentially until first successful response. That's why I mentioned 1 broker in that list would work as long as it's alive.
  6. Returned TopicMetadataResponse will contain the information about requested topics, in your case it's foo and brokers in the cluster. Basically, this response contains the following:
    • list of brokers in the cluster, where each broker has an ID, host and port. This list may not contain the entire list of brokers in the cluster, but should contain at least the list of brokers that are responsible for servicing the subject topic.
    • list of topic metadata, where each entry has topic name, number of partitions, leader broker ID for each partition and ISR broker IDs for each partition.
  7. Based on TopicMetadataResponse your producer builds up its local cache and now knows exactly that the request for topic foo partition 0 should go to broker X.
  8. Based on number of partitions in a topic, producer partitions your message and accumulates it with the knowledge that it should be sent as a part of batch to some broker.
  9. When the batch is full or linger.ms timeout passes, your producer flushes the batch to the broker. By "flushes" I mean "opens a new connection to a broker or reuses an existing one, and sends the ProduceRequest".

The producer does not need to open unnecessary connections to all brokers, as the topic you are producing to may not be serviced by some brokers, and your cluster could be quite large. Imagine a 1000 broker cluster with lots of topics, but one of topics has just one partition - you only need that one connection, not 1000.

In your particular case I'm not 100% sure why you have 2 open connections to brokers, if you have just a single partition, but I assume one connection was opened during metadata discovery and was cached for reusing, and the second one is the actual broker connection to produce data. However, I might be wrong in this case.

But anyway, there is no need at all to have a connection for the third broker.

Regarding your question about "Should I always have number of brokers = number of partitons?" the answer is most likely no. If you explain what you are trying to achieve, maybe I'll be able to point you to the right direction, but this is too broad to explain in general. I recommend reading this to clarify things.

UPD to answer the question in comment:

Metadata cache is updated in 2 cases:

  1. If producer fails to communicate with broker for any reason - this includes the case when the broker is not reachable at all and when broker responds with an error (like "I'm not leader for this partition anymore, go away")

  2. If no failures happen, the client still refreshes metadata every metadata.max.age.ms (https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java#L42-L43) to discover new brokers and partitions itself.

like image 164
serejja Avatar answered Jan 22 '23 09:01

serejja