Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spring-kafka - how to read one topic from the beginning, while reading another one from the end?

I'm writing a spring-kafka app, in which I need to read 2 topics: test1 and test2:

public class Receiver {

    private static final Logger LOGGER = LoggerFactory
            .getLogger(Receiver.class);

    @KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
  @TopicPartition(topic = "test2", partitions = { "0" })})
    public void receiveMessage(String message) {
        LOGGER.info("received message='{}'", message);
    }
}

My config looks like this:

@Configuration
@EnableKafka
public class ReceiverConfig {

    @Value("${kafka.bootstrap.servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        // list of host:port pairs used for establishing the initial connections
        // to the Kakfa cluster
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class);
        // consumer groups allow a pool of processes to divide the work of
        // consuming and processing records
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

        return props;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public Receiver receiver() {
        return new Receiver();
    }
}

I need to be able to read only the latest messages from "test1", while being able to read all messages from the very beginning of "test2". I'm only interested in "test2" messages upon my app startup, but the "test1" messages need to be read continuously as long as the app is running.

Is there a way to configure such functionality?

like image 756
Eugene Goldberg Avatar asked Nov 29 '22 22:11

Eugene Goldberg


2 Answers

I have struggled with this issue as well and would like to present a solution that is a little more general.

While your solution works, you need to hard-code the partitions. You coud also have your class that uses the @KafkaListener-annotation implement the ConsumerSeekAware interface.

This provides you with three methods that can be used to seek to a specific offset. There is one method that will be invoked as soon as the partitions are assigned. So it could look like this.

@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    assignments.keySet().stream()
        .filter(partition -> "MYTOPIC".equals(partition.topic()))
        .forEach(partition -> callback.seekToBeginning("MYTOPIC", partition.partition()));
}

That way you don't need to touch any code when you decide to add more partitions to the topic :)

Hope this helps someone.

like image 127
konse Avatar answered Dec 04 '22 02:12

konse


Here is a way, which worked for me:

@KafkaListener(id = "receiver-api",         
        topicPartitions =
        { @TopicPartition(topic = "schema.topic", 
                partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
                @TopicPartition(topic = "data.topic", partitions = { "0" })})
    public void receiveMessage(String message) {
        try {
                JSONObject incomingJsonObject = new JSONObject(message); 
                if(!incomingJsonObject.isNull("data")){
                    handleSchemaMessage(incomingJsonObject);
                }

                else {
                    handleDataMessage(incomingJsonObject);
                }

        } catch (Exception e) {
            e.printStackTrace();
        }

Using "partitionOffsets" annotation (import org.springframework.kafka.annotation.PartitionOffset;)

was the key to being able always read a specific topic from the beginning, while "tailing" other topic as usual.

like image 29
Eugene Goldberg Avatar answered Dec 04 '22 02:12

Eugene Goldberg