I made kafka java producer. but console said error. kafka server is on aws. and producer is on my mac. and yet kara server is reachable. When i send message from producer, kafka server shows "Accepted connection .. ". What is problem?
1 [main] INFO kafka.utils.VerifiableProperties - Verifying properties
28 [main] INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to xxxxxx:9092
28 [main] INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder
137 [main] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host: xxxxxx,port:9092 with correlation id 0 for 1 topic(s) Set(words_topic)
189 [main] ERROR kafka.producer.SyncProducer - Producer connection to xxxxxx:9092 unsuccessful
198 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(words_topic)] from broker [id:0,host: xxxxxx,port:9092] failed
199 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics [Set(words_topic)] from broker [ArrayBuffer(id:0,host: xxxxxx,port:9092)] failed
And It's kafka console
[2015-01-27 05:23:33,767] DEBUG Accepted connection from /xxxxx on /xxxx:9092. sendBufferSize [actual|requested]: [212992|1048576] recvBufferSize [actual|requested]: [212992|1048576] (kafka.network.Acceptor)
[2015-01-27 05:23:33,767] DEBUG Processor 1 listening to new connection from /xxxx:65307 (kafka.network.Processor)
[2015-01-27 05:23:33,872] INFO Closing socket connection to /xxxx. (kafka.network.Processor)
[2015-01-27 05:23:33,873] DEBUG Closing connection from /xxxx:65307 (kafka.network.Processor)
This is my code.
Properties props = new Properties();
props.put("metadata.broker.list", "?????:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
// Now we break each word from the paragraph
for (String word :
METAMORPHOSIS_OPENING_PARA.split("\\s")) {
// Create message to be sent to "words_topic" topic with the word
KeyedMessage<String, String> data =
new KeyedMessage<String, String>
("words_topic", word);
// Send the message
producer.send(data);
}
System.out.println("Produced data");
// close the producer
producer.close();
}
// First paragraph from Franz Kafka's Metamorphosis
private static String METAMORPHOSIS_OPENING_PARA =
"One morning, when Gregor Samsa woke from troubled dreams, "
+ "he found himself transformed in his bed into a horrible "
+ "vermin. He lay on his armour-like back, and if he lifted "
+ "his head a little he could see his brown belly, slightly "
+ "domed and divided by arches into stiff sections.";
Unable to reach Kafka cluster The producer may fail to push message to a topic due to a network partition or unavailability of the Kafka cluster, in such cases there are high chances of messages being lost, hence we need a retry mechanism to avoid loss of data.
The source code for a Dead Letter Queue implementation contains a try-cath block to handle expected or unexpected exceptions. The message is processed if no error occurs. Send the message to a dedicated DLQ Kafka topic if any exception occurs. The failure cause should be added to the header of the Kafka message.
The retries setting determines how many times the producer will attempt to send a message before marking it as failed. The default values are: 0 for Kafka <= 2.0. MAX_INT, i.e., 2147483647 for Kafka >= 2.1.
I solve it
Set 'advertised.host.name' on server.properties of Kafka broker to server's realIP(same to producer's 'metadata.broker.list' property)
refrence : https://issues.apache.org/jira/browse/KAFKA-1092
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With