I'm writing a spring-kafka app, in which I need to read 2 topics: test1 and test2:
public class Receiver {
private static final Logger LOGGER = LoggerFactory
.getLogger(Receiver.class);
@KafkaListener(id = "bar", topicPartitions =
{ @TopicPartition(topic = "test1", partitions = { "0" }),
@TopicPartition(topic = "test2", partitions = { "0" })})
public void receiveMessage(String message) {
LOGGER.info("received message='{}'", message);
}
}
My config looks like this:
@Configuration
@EnableKafka
public class ReceiverConfig {
@Value("${kafka.bootstrap.servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections
// to the Kakfa cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// consumer groups allow a pool of processes to divide the work of
// consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test1");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");
return props;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public Receiver receiver() {
return new Receiver();
}
}
I need to be able to read only the latest messages from "test1", while being able to read all messages from the very beginning of "test2". I'm only interested in "test2" messages upon my app startup, but the "test1" messages need to be read continuously as long as the app is running.
Is there a way to configure such functionality?
I have struggled with this issue as well and would like to present a solution that is a little more general.
While your solution works, you need to hard-code the partitions.
You coud also have your class that uses the @KafkaListener
-annotation implement the ConsumerSeekAware
interface.
This provides you with three methods that can be used to seek to a specific offset. There is one method that will be invoked as soon as the partitions are assigned. So it could look like this.
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().stream()
.filter(partition -> "MYTOPIC".equals(partition.topic()))
.forEach(partition -> callback.seekToBeginning("MYTOPIC", partition.partition()));
}
That way you don't need to touch any code when you decide to add more partitions to the topic :)
Hope this helps someone.
Here is a way, which worked for me:
@KafkaListener(id = "receiver-api",
topicPartitions =
{ @TopicPartition(topic = "schema.topic",
partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")),
@TopicPartition(topic = "data.topic", partitions = { "0" })})
public void receiveMessage(String message) {
try {
JSONObject incomingJsonObject = new JSONObject(message);
if(!incomingJsonObject.isNull("data")){
handleSchemaMessage(incomingJsonObject);
}
else {
handleDataMessage(incomingJsonObject);
}
} catch (Exception e) {
e.printStackTrace();
}
Using "partitionOffsets" annotation (import org.springframework.kafka.annotation.PartitionOffset;)
was the key to being able always read a specific topic from the beginning, while "tailing" other topic as usual.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With