I am running this kafka producer example mentioned in its site
The code:
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "host.broker-1:9093, host.broker-2:9093, host.broker-3:9095");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "test.app.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = "192.168.2." + rnd.nextInt(255);
String msg = runtime + ",www.example.com," + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
public class SimplePartitioner implements Partitioner{
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
More details:
I am running this application on a host(call is producer) which is remote to host-broker[1-3]
I can ping and ssh the broker host from producer host.
Provided the advertised.host.name in the server.properties (they are named as server[1-3].properties in the brokers respectively
The properties:
broker.id=1
port=9093
host.name=host.broker.internal.name
advertised.host.name=host-broker1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/1/kafka-logs-1,/data/2/kafka-logs-2
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
zookeeper.connection.timeout.ms=6000
Any idea on how to fix this error?
You can deal with failed transient sends in several ways: Drop failed messages. Exert backpressure further up the application and retry sends. Send all messages to alternative local storage, from which they will be ingested into Kafka asynchronously.
The Kafka max message size is 1MB. In this lesson we will look at two approaches for handling larger messages in Kafka. Kafka has a default limit of 1MB per message in the topic. This is because very large messages are considered inefficient and an anti-pattern in Apache Kafka.
Also, since Aiven Kafka services are offered only over encrypted TLS connections, we included the configuration for these, namely the required certificates and keys. librdkafka defaults to a maximum batch size of 10000 messages or to a maximun request size of one million bytes per request, whichever is met first.
Conclusion: Kafka can handle the volume With 5 brokers it makes autoscaling the Kafka cluster to 4 vCPU and 12 GB memory we are able to sustain 200 million events per hour or 55,000 events per second simple.
I got these errors when running a Kafka producer:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Found a solution:
On my Mac box, after I download the scala-2.10
and kafka_2.10-0.8.1
, in the kafka_2.10-0.8.1 directory, every thing is fine when I start zookeeper, kafka server, and create a test topic. Then I need to start a producer for the test topic. but there is an error:
yhuangMac:kafka_2.10-0.8.1 yhuang$ ./bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
SLF4J: Failed to load class “org.slf4j.impl.StaticLoggerBinder”.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
The reason is that in the kafka libs directory, the kafka release zip file only included jar file of slf4j-api, they missed a jar file: slf4j-nop.jar, so we have to go to http://www.slf4j.org, download slf4j-1.7.7.zip
, and then unzip it, copy the slf4j-api-1.7.7, slf4j-nop-1.7.7.jar into kafka’s libs directory.
Restart kafka producer again, now no error is reported.
Source: SOLUTION
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With