Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to transactionally poll Kafka from Camel?

I'm currently working on message bus based on kafka and managed by camel and Spring. I have a XML route definition to poll events and retrieve the corresponding complete business objects from an external API that looks like that :

`

<route id="station-event-enrich-route" autoStartup="true" >
        <from
            uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{events.topic.name}}&amp;autoCommitEnable=false&amp;allowManualCommit=true&amp;maxPollRecords={{station.brocker.bulk.limit}}&amp;groupId={{kafka.groupId}}" />

        <!-- SNIP logic to aggregate several events -->

        <pollEnrich strategyRef="keepHeadersAggregationStrategy">
            <simple>{{api.url}}?view=full&amp;id=$simple{in.headers.BUSINESS_ID}</simple>
        </pollEnrich>

        <!-- SNIP logic to split the retrieved events according to their ids -->

        <to uri="velocity:velocity/resource-object.vm"/>    

        <removeHeaders pattern="*" excludePattern="MANUAL_COMMIT"/>

        <to uri="kafka:{{kafka.cluster.url}}?brokers={{kafka.cluster.url}}&amp;topic={{objects.topic.name}}&amp;groupId={{kafka.groupId}}&amp;requestRequiredAcks=all" />

        <transform>
            <simple>${headers.MANUAL_COMMIT.commitSync()}</simple>
        </transform>
</route>

` My problem is the following : when the kafka event topic is polled, and if the api.url in my pollEnrich is not available, no business object is retrieved and the event is lost. So I need to implement a transactional logic to be able to rollback on the initial kafka polling in my route so that the same event could be polled several times till the api.url send me the awaited business object.

I tried several approaches, starting from updating my version of org.apache.camel:camel-kafka to 2.22.0 to be able to play with the manual commit. Then, I tried to implement a basic error handler (configured with maximumRedeliveries=-1 to have infinite retries) so that when the pollEnrich triggers a onException, I can set a header to avoid doing the final manual commit. It works, apparently, but my event is never repolled again.

I also tried to use the transacted tag with a org.springframework.kafka.transaction.KafkaTransactionManager instance from spring-kafka, but it's not the good approach as only the producers are transactional.

What I am missing, and what is the correct approach ?

I use Java 8, Camel 2.22.0 and Spring 4.3.18.RELEASE (not recommended but it should work).

like image 620
Julien Avatar asked Dec 18 '22 21:12

Julien


1 Answers

It looks like a relatively new feature in Camel to support Kafka manual commits. And the documentation wasn't particularly clear. I'm using Camel 2.22.1.

From the description of your problem, you are looking for "at least once" semantics. That is you want to be able to re-process a message when there was an issue. Of course the result of this approach is that no other messages in the partition w/ a failing message can be processed (or seen) until the application can successfully process it. In the case of a failing service, this would likely result in all partitions for a given topic being blocked until the service is back up.

The Kafka uri to get this to work would look like this: kafka:TestLog?brokers=localhost:9092&groupId=kafkaGroup&maxPollRecords=3&consumersCount=1&autoOffsetReset=earliest&autoCommitEnable=false&allowManualCommit=true&breakOnFirstError=true

Breaking that down a bit:

  • kafka:TestLog : specifies the Kafka topic to consume from
  • brokers=localhost:9092 : specifies the bootstrap servers for Kafka cluster
  • groupId=kafkaGroup : specifies the Kafka consumer group
  • consumersCount=1 : specifies the number of Kafka consumers for that Camel route

The last two configuration settings are important when consuming from a Kafka topic with a number of partitions. They need to be tuned/configured so that they are taking into account the number of Camel instances you are planning to run.

The more interesting configuration to get to "at least once" semantics:

  • autoCommitEnable=false : turn off auto committing of offsets so we can use manual commits.
  • allowManualCommit=true : turn on manual commits, giving us access to the KafkaManualCommit capability (see code below).
  • breakOnFirstError=true : when this is true, the route will stop processing the rest of the messages in the batch received on last poll of the topic.
  • maxPollRecords=3 : specifies the number of messages consumed during a single poll of the Kafka topic.It is probably is a good idea to keep this set to a low number, since issues w/ a message in the batch would cause all of the messages in the batch to be re-processed.
  • autoOffsetReset=earliest : will cause the consumer to read from the earliest offset when there is a difference between the current offset and the offset marking the end of the partition (more on that in a bit).

The Camel route would look something like this:

      from(kafkaUrl)
        .routeId("consumeFromKafka")
        .process(exchange -> {
            LOGGER.info(this.dumpKafkaDetails(exchange));
        })
        .process(exchange -> {
            // do something
        })
        .process(exchange -> {
            // do something else
        })
        .process(exchange -> {
            exchange.setProperty(Exchange.FILE_NAME, UUID.randomUUID().toString() + ".txt");
        })
        .to("file://files")
        // at the end of the route 
        // manage the manual commit
        .process(exchange -> {
            // manually commit offset if it is last message in batch
            Boolean lastOne = exchange.getIn().getHeader(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, Boolean.class);

            if (lastOne) {
                KafkaManualCommit manual =
                        exchange.getIn().getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
                if (manual != null) {
                    LOGGER.info("manually committing the offset for batch");
                    manual.commitSync();
                }
            } else {
                LOGGER.info("NOT time to commit the offset yet");
            }
        });

After running this route and getting an error you can see the state of the consumer group with this command:

./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group kafkaGroup --describe

that might yield this result:

TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
TestLog 0 92 95 3

This is where the autoOffsetReset setting comes into play. The current offset is where the consumer group wants to consume from. If that offset (92) is the error message, then the group will fall behind as more messages (in this case two more) are added. The route (using the given settings) will cause Camel to continually process the message at offset 92 until it succeeds. If the Camel route is stopped and started, the application would pick up consuming from the earliest offset (the 92) and not the latest which would be 95 based on autoOffsetReset. Using latest would result in "lost" messages, because a restart of Camel would start processing using latest offset.

A sample application is available here

like image 157
Mike Barlotta Avatar answered Jan 05 '23 12:01

Mike Barlotta