Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache camel route from kafka to another kafka topic on error

Tags:

apache-camel

I am using Apache Camel to consume messages from kafka topic and then process the message, while processing if an exception occurs, I redirect that message to another kafka topic and process that in separate route. so I have a route something like below.

from ("kafka1").process("someProcessor").end();
onException(Throwable.class).process(exchange->{exchange.getIn().setBody("Message with error details")}).to("kafka2");

Above code is actually sending the error message in same kafka (kafka1).

I solved this by setting exchange.getIn().setHeader(KafkaConstants.TOPIC,"kafka2")) in onException process. Is this expected behavior? why would it ignore kafka2 and use kafka1 instead?

  1. Version of camel used - 2.14.0

  2. Kafka endpoint URLs:

Consumer:

from("kafka:" + ("kafka.broker") + "?topic="
            + ("offer.kafka.topic")
            + "&zookeeperHost=" + ("kafka.zookeeper.host")
            + "&zookeeperPort=" + ("kafka.zookeeper.port")
            + "&groupId=" + ("offer.kafka.group.id")
            + "&consumerStreams=" + ("kafka.streams")
            + "&autoCommitIntervalMs=" + ("product.kafka.consumer.auto.commit.intervals")
            + "&zookeeperConnectionTimeoutMs=" + ("zookeeper.connection.timeout")
            + "&rebalanceMaxRetries=" + ("kafka.rebalance.max.retries")
            + "&rebalanceBackoffMs=" + ("kafka.rebalance.backoffs.ms")
            + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
            + "&autoOffsetReset=" + ("kafka.auto.offset.reset")
            + "&fetchMessageMaxBytes=" + ("kafka.fetch.message.max.bytes")
            + "&socketReceiveBufferBytes=" + ("receive.buffer.bytes"))
            .routeId("offerEventRoute").to("direct:offerEventRoute");

Producer:

to("kafka:" + ("error.kafka.broker") + "?topic="
                        + ("error.kafka.topic")
                        + "&zookeeperHost=" + ("error.kafka.zookeeper.host")
                        + "&zookeeperPort=" + ("error.kafka.zookeeper.port")
                        + "&groupId=" + ("error.kafka.group.id")
                        + "&zookeeperConnectionTimeoutMs=" + ("error.zookeeper.connection.timeout")
                        + "&rebalanceMaxRetries=" + ("rebalance.max.retries")
                        + "&rebalanceBackoffMs=" + ("rebalance.backoffs.ms")
                        + "&zookeeperSessionTimeoutMs=" + ("zookeeper.session.timeout")
                        + "&autoOffsetReset=" + ("auto.offset.reset")
                        + "&messageSendMaxRetries=" + ("error.max.retries")
                        + "&serializerClass=kafka.serializer.StringEncoder"
        );
like image 617
Amit Avatar asked Jan 26 '26 00:01

Amit


1 Answers

You need to set the bridgeEndPoint to true in your producer kafka end point. Otherwise it looks for the topic name in the exchange headers and uses that as the topic name for the producer also.

By default it is false.

like image 198
yaswanth Avatar answered Jan 27 '26 13:01

yaswanth



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!