Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I produce messages with Kafka 8.2 API in Java?

I'm trying to work with the kafka API in java. I'm using the following maven dependency:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.8.2.0</version>
</dependency>

I'm having trouble connecting to a remote kafka server. I changed the kafka 'server.properties' file port attribute to be port 8080. I can start both the zookeeper and the kafka server no problem. I can also use the console producer and consumer applications that came with the kafka download. (Scala 2.10 version)

I'm using the following client code to create a remote KafkaProducer

Properties propsProducer = new Properties();

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");
propsProducer.put("key.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
propsProducer.put("topic.metadata.refresh.interval.ms", "0");

KafkaProducer<byte[], byte[]> m_kafkaProducer = new KafkaProducer<byte[], byte[]>(propsProducer);

Once I've created the producer, I can run the following line and get valid topic info returned, granted strTopic is an existing topic name.

List<PartitionInfo> partitionInfo = m_kafkaProducer.partitionsFor(strTopic);

When I try to send a message, I do the following:

ProducerRecord<byte[], byte[]> prMessage = new ProducerRecord<byte[],byte[]>(strTopic, strMessage.getBytes());

RecordMetadata futureData = m_kafkaProducer.send(prMessage).get();

The call to send() blocks indefinitely and when I manually terminate the process, I see that the ERROR Closing socket because of error on kafka server(IOException, Connection Reset by Peer) error.

Also, it's worth nothing that the host.name, advertised.host.name, and advertised.port properties are all still commented out on the 'server.properties' file. Oh, and if I change the line:

propsProducer.put("bootstrap.servers", "172.xx.xx.xxx:8080");

to

propsProducer.put("bootstrap.servers", "127.0.0.1:8080");

and run it on the same server as where the kafka server is installed, it works but I'm trying to work with it remotely.

Appreciate any help and if I can clarify at all let me know.

like image 448
Patrick Hovsepian Avatar asked Oct 20 '22 15:10

Patrick Hovsepian


1 Answers

After lots of digging, I decided to implement the example found here: Kafka Producer Example. I shortened the code and didn't implement a partitioner class. I updated my pom with the dependency listed and I was still having the same issue. Ultimately, I made some configuration changes and everything worked.

The final piece of the puzzle was defining the Kafka server in /etc/hosts of both the server and the client machines. I added the following to both files.

172.xx.xx.xxx     serverHost1

Again, the x's are just masks. Then, I set the advertised.host.name in the server.properties file to serverHost1. NOTE: I got that IP after running an ifconfig on the server machine.

I changed the line

propsProducer.put("metadata.broker.list", "172.xx.xx.xxx:8080");

to

propsProducer.put("metadata.broker.list", "serverHost1:8080");

The Kafka API didn't like the fact that I was defining an IP as a string. Instead it was looking up the IP from within the etc/hosts file although the documentation says:

"Hostname the broker will advertise to producers and consumers. If not set, it uses the value for "host.name" if configured. Otherwise, it will use the value returned from java.net.InetAddress.getCanonicalHostName()."

Which will just return the IP, in the string form, I was previously using if not defined in etc/hosts of client machine, otherwise it returns the name paired with the IP (serverHost1 in my case). Also, I never did set the value of host.name either.

like image 176
Patrick Hovsepian Avatar answered Oct 24 '22 14:10

Patrick Hovsepian