I saw in a video tutorial that Kafka Broker supports 3 types of acknowledgement when producer posts a message.
0 - Fire and Forget
1 - Leader Ack
2 - Ack of all the brokers
I am using Kafka's Java API to post message. Is this something that has to be set for each broker using server.properties specific to each broker or is it something that has to be set by producer? If it has to be set by the producer , please explain how it can be set using Java API.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class KafkaProducerApp {
public static void main(String[] args){
Properties properties = new Properties();
properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String, String>(properties);
try{
for(int i=0;i<150;i++) {
RecordMetadata ack = kafkaProducer.send(new ProducerRecord<String, String>("replicated_topic", Integer.toString(i), "MyMessage" + Integer.toString(i))).get();
System.out.println(" Offset = " + ack.offset());
System.out.println(" Partition = " + ack.partition());
}
} catch (Exception ex){
ex.printStackTrace();
} finally {
kafkaProducer.close();
}
}
}
It's a producer property and is set similar to other properties you have in your code:
properties.put("acks","all");
The list of all configurable producer properties can be found here.
You might want to also look at the broker (or topic) property min.insync.replicas
that is related to this producer config.
I think you should understand the acks property what has actually done and look at the also behind scenes. If that is ok, you will see this property is configured by the producer.
For example, you mustn't lose any message like an audit log. The following code how we would start our producer config:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
props.put("acks", "all"); //We are using acks=all in order to get the strongest guarantee we can.
props.put("retries", "3");
props.put("max.in.flight.requests.per.connection", "5");
This is a small but powerful change that has a major impact on if a message is going to arrive or not.
This images that from Kafka In Action book which represents more clear for acks
property:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With