Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Start reading Kafka topic from specific Offset in Apache Camel

I read all the documentation of Camel Kafka and the only approach I read is this one from git and the route builder that specifies

    public void configure() throws Exception {
                    from("kafka:" + TOPIC
                                 + "?groupId=A"
                                 + "&autoOffsetReset=earliest"             // Ask to start from the beginning if we have unknown offset
                                 + "&consumersCount=2"                     // We have 2 partitions, we want 1 consumer per partition
                                 + "&offsetRepository=#offset")            // Keep the offset in our repository
                            .to("mock:result");

}

But for order for the Clients I need to use the Spring so my endpoint for kafka is this

<!--DEFINE KAFKA'S TOPCIS AS ENDPOINT-->
        <endpoint id="tagBlink" uri="kafka:10.0.0.165:9092">
            <property key="topic" value="tagBlink"/>
            <property key="brokers" value="10.0.0.165:9092"/>
            <property key="offsetRepository" value="100"/>
        </endpoint>

But getting an exception

Could not find a suitable setter for property: offsetRepository as there isn't a setter method with same type: java.lang.String nor type conversion possible: No type converter available to convert from type: java.lang.String to the required type: org.apache.camel.spi.StateRepository with value 100

Is this possible with my current configuration? How can I resume from an specific offset? ?

like image 552
Gatusko Avatar asked Jul 20 '18 21:07

Gatusko


People also ask

How do I fetch specific messages in Apache Kafka?

Assuming that we want to consume the messages between offsets 11 and 21 , we need to run the following kafkacat command: -o 11 denotes that the consumer should start fetching messages from offset 11 , and -c 10 specifies that we only need to consume the first ten messages starting from offset 11 .

How do you manually commit specific offset in Kafka?

Method Summary Manually assign a list of partition to this consumer. Get the set of partitions currently assigned to this consumer. Close the consumer, waiting indefinitely for any needed cleanup. Commit offsets returned on the last poll() for all the subscribed list of topics and partition.

How do you read a Kafka topic from the beginning?

If you want to process a topic from its beginning, you can simple start a new consumer group (i.e., choose an unused group.id ) and set auto. offset. reset = earliest . Because there are no committed offsets for a new group, auto offset reset will trigger and the topic will be consumed from its beginning.


1 Answers

After this time I managed to work with this. I followed the Spring Bean creation for this and checking the documentation for the FileStateRepository I need a File so I created a File Bean and added as constructor-arg. After that I added an init-method="doStart". This method load a file if exist, and if not it will create the file.

     <endpoint id="event" uri="kafka:localhost:9092">
        <property key="topic" value="eventTopic4"/>
        <property key="brokers" value="localhost:9092"/>
        <property key="autoOffsetReset" value="earliest"/>
        <property key="offsetRepository" value="#myRepo2"/>
    </endpoint>

    <bean id="myFileOfMyRepo" class="java.io.File">
        <constructor-arg type="java.lang.String" value="C:\repoDat\repo.dat"/>
    </bean>

    <bean id="myRepo2" class="org.apache.camel.impl.FileStateRepository " factory-method="fileStateRepository" init-method="doStart">
        <constructor-arg ref="myFileOfMyRepo"/>
    </bean>

After this I saw the Code of the KafkaConsumer of the Camel in Git.

    offsetRepository.getState(serializeOffsetKey(topicPartition));
    if (offsetState != null && !offsetState.isEmpty()) {
        // The state contains the last read offset so you need to seek from the next one
        long offset = deserializeOffsetValue(offsetState) + 1;
        log.debug("Resuming partition {} from offset {} from state", topicPartition.partition(), offset);
        consumer.seek(topicPartition, offset);
    } 

With this I managed to read from the last offset. I hope Camel Documentation add this extra steps for Kafka.

like image 125
Gatusko Avatar answered Oct 20 '22 20:10

Gatusko