Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

java Kafka producer error

I made kafka java producer. but console said error. kafka server is on aws. and producer is on my mac. and yet kara server is reachable. When i send message from producer, kafka server shows "Accepted connection .. ". What is problem?

1 [main] INFO kafka.utils.VerifiableProperties - Verifying properties
28 [main] INFO kafka.utils.VerifiableProperties - Property      metadata.broker.list is overridden to xxxxxx:9092
28 [main] INFO kafka.utils.VerifiableProperties - Property serializer.class is overridden to kafka.serializer.StringEncoder
137 [main] INFO kafka.client.ClientUtils$ - Fetching metadata from broker id:0,host: xxxxxx,port:9092 with correlation id 0 for 1 topic(s) Set(words_topic)
189 [main] ERROR kafka.producer.SyncProducer - Producer connection to xxxxxx:9092 unsuccessful
198 [main] WARN kafka.client.ClientUtils$ - Fetching topic metadata with correlation id 0 for topics [Set(words_topic)] from broker [id:0,host: xxxxxx,port:9092] failed
199 [main] ERROR kafka.utils.Utils$ - fetching topic metadata for topics [Set(words_topic)] from broker [ArrayBuffer(id:0,host: xxxxxx,port:9092)] failed

And It's kafka console

[2015-01-27 05:23:33,767] DEBUG Accepted connection from /xxxxx on /xxxx:9092. sendBufferSize [actual|requested]: [212992|1048576] recvBufferSize [actual|requested]: [212992|1048576] (kafka.network.Acceptor)
[2015-01-27 05:23:33,767] DEBUG Processor 1 listening to new connection from /xxxx:65307 (kafka.network.Processor)
[2015-01-27 05:23:33,872] INFO Closing socket connection to /xxxx. (kafka.network.Processor)
[2015-01-27 05:23:33,873] DEBUG Closing connection from /xxxx:65307 (kafka.network.Processor)

This is my code.

    Properties props = new Properties();
    props.put("metadata.broker.list", "?????:9092");
    props.put("serializer.class", "kafka.serializer.StringEncoder");
    ProducerConfig config = new ProducerConfig(props);
    Producer<String, String> producer = new Producer<String, String>(config);

    // Now we break each word from the paragraph
        for (String word :
            METAMORPHOSIS_OPENING_PARA.split("\\s")) {
        // Create message to be sent to "words_topic" topic with the word
        KeyedMessage<String, String> data =
                new KeyedMessage<String, String>
                        ("words_topic", word);
        // Send the message
        producer.send(data);
    }
    System.out.println("Produced data");
    // close the producer
    producer.close();

}

// First paragraph from Franz Kafka's Metamorphosis
private static String METAMORPHOSIS_OPENING_PARA =
        "One morning, when Gregor Samsa woke from troubled dreams, "
                + "he found himself transformed in his bed into a horrible "
                + "vermin. He lay on his armour-like back, and if he lifted "
                + "his head a little he could see his brown belly, slightly "
                + "domed and divided by arches into stiff sections.";
like image 604
Rabbit Summer Avatar asked Jan 27 '15 05:01

Rabbit Summer


People also ask

What if Kafka producer fails?

Unable to reach Kafka cluster The producer may fail to push message to a topic due to a network partition or unavailability of the Kafka cluster, in such cases there are high chances of messages being lost, hence we need a retry mechanism to avoid loss of data.

How do you handle failed messages in Kafka?

The source code for a Dead Letter Queue implementation contains a try-cath block to handle expected or unexpected exceptions. The message is processed if no error occurs. Send the message to a dedicated DLQ Kafka topic if any exception occurs. The failure cause should be added to the header of the Kafka message.

How does Kafka producer retry work?

The retries setting determines how many times the producer will attempt to send a message before marking it as failed. The default values are: 0 for Kafka <= 2.0. MAX_INT, i.e., 2147483647 for Kafka >= 2.1.


1 Answers

I solve it

Set 'advertised.host.name' on server.properties of Kafka broker to server's realIP(same to producer's 'metadata.broker.list' property)

refrence : https://issues.apache.org/jira/browse/KAFKA-1092

like image 105
Rabbit Summer Avatar answered Oct 05 '22 23:10

Rabbit Summer