I need help in publishing a message to a topic using kafka producer. My kafka producer client is written in scala running over spark.
My Job runs successfully but it seems my message is not published.
Here is the code
val response = info.producer.asInstanceOf[KafkaProducer[K, V]].send(new ProducerRecord(info.props.getProperty(s"$topicNickName.topic"), keyMessage._1, keyMessage._2))
Producer Config Values
metric.reporters = []
metadata.max.age.ms = 300000
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
bootstrap.servers = [x.data.edh:6667, y.data.edh:6667, z.data.edh:6667, a.data.edh:6667, b.data.edh:6667]
ssl.keystore.type = JKS
sasl.mechanism = GSSAPI
max.block.ms = 60000
interceptor.classes = null
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
request.timeout.ms = 30000
acks = 1
receive.buffer.bytes = 32768
ssl.truststore.type = JKS
retries = 0
ssl.truststore.location = null
ssl.keystore.password = null
send.buffer.bytes = 131072
compression.type = none
metadata.fetch.timeout.ms = 60000
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
buffer.memory = 33554432
timeout.ms = 30000
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
block.on.buffer.full = false
ssl.key.password = null
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
max.in.flight.requests.per.connection = 5
metrics.num.samples = 2
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
batch.size = 16384
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
max.request.size = 1048576
value.serializer = class org.apache.kafka.common.serialization.StringSerializer
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
linger.ms = 0
How do i debug the problem?
Here is an example of how to produce messages to Kafka in Scala:
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
val kafkaProducerProps: Properties = {
val props = new Properties()
props.put("bootstrap.servers", "x.data.edh:6667")
props.put("key.serializer", classOf[StringSerializer].getName)
props.put("value.serializer", classOf[StringSerializer].getName)
props
}
val producer = new KafkaProducer[String, String](kafkaProducerProps)
producer.send(new ProducerRecord[String, String]("myTopic", keyMessage._1, keyMessage._2))
If you want to do streaming I recommend to have a look at the Spark + Kafka integration Guide.
Note that the example given above is a KafkaProducer in a fire-and-forget mode. There are the following modes on how to use Kafka Producer:
Fire-and-forget We send a message to the server and don’t really care if it arrives succesfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, some messages will get lost using this method.
Synchronous send We send a message, the
send()
method returns a Future object, and we useget()
to wait on the future and see if thesend()
was successful or not.Asynchronous send We call the
send()
method with acallback
function, which gets triggered when it receives a response from the Kafka broker
producer.send(record).get()
producer.send(record, new compareProducerCallback)
producer.flush()
// Callback trait only contains the one abstract method onCompletion
private class compareProducerCallback extends Callback {
@Override
override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
if (exception != null) {
exception.printStackTrace()
}
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With