I have built a small testing environment on my Windows PC and write down the following code for testing kafka (using kafka_2.10:0.9.0.1 from org.apache.kafka).
package iii.functiontesting;
import java.text.ParseException;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* Hello world!
*
*/
public class test4
{
public static void main( String[] args ) throws ParseException
{
Properties producerProps=new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("serializer.class",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("key.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("value.serializer",org.apache.kafka.common.serialization.StringSerializer.class.getName());
producerProps.put("request.required.acks","1");
KafkaProducer<String,String> kafkawriter= new KafkaProducer<String,String>(producerProps);
ProducerRecord<String,String> msg=new ProducerRecord<>("TEST3","ImKey","teststring1");
kafkawriter.send(msg);
}
}
I use the following command to check whether the message is correctly written into the queue
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-consumer.bat --zookeeper localhost:2181 --topic TEST3 --from-beginning
However, I found that the kafka-console-consumer shows nothing.
I have doubted that my kafka server doesn't run properly, so I use console-producer to test.
D:\Work\kafkaenv\kafka_2.10-0.9.0.1\bin\windows>.\kafka-console-producer.bat --broker-list localhost:9092 --topic TEST3
aaaaa
This time I can see the aaaaa is clearly shown under the console-consumer. I cannot figure out what happens. Can anyone help me?
Basics. We can notice here that the default retention time is seven days.
Step1: Start the zookeeper as well as the kafka server. Step2: Type the command: 'kafka-console-producer' on the command line. This will help the user to read the data from the standard inputs and write it to the Kafka topic.
The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space.
You have to call either KafkaProducer#flush
[or] KafkaProducer#close
method before terminating the program.
Actually, the producer buffers the records before sending it to the broker. See buffer.memory
and batch.size
in the Kafka Producer configuration
kafkawriter.send(msg);
kafkawriter.close();
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With