Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Are there any problems with this way of starting an infinite loop in a Spring Boot application?

I have a Spring Boot application and it needs to process some Kafka streaming data. I added an infinite loop to a CommandLineRunner class that will run on startup. In there is a Kafka consumer that can be woken up. I added a shutdown hook with Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));. Will I run into any problems? Is there a more idiomatic way of doing this in Spring? Should I use @Scheduled instead? The code below is stripped of specific Kafka-implementation stuff but otherwise complete.

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Properties;


    @Component
    public class InfiniteLoopStarter implements CommandLineRunner {

        private final Logger logger = LoggerFactory.getLogger(this.getClass());

        @Override
        public void run(String... args) {
            Consumer<AccountKey, Account> consumer = new KafkaConsumer<>(new Properties());
            Runtime.getRuntime().addShutdownHook(new Thread(consumer::wakeup));

            try {
                while (true) {
                    ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                    //process records
                }
            } catch (WakeupException e) {
                logger.info("Consumer woken up for exiting.");
            } finally {
                consumer.close();
                logger.info("Closed consumer, exiting.");
            }
        }
    }
like image 709
Sebastiaan van den Broek Avatar asked Jan 15 '19 08:01

Sebastiaan van den Broek


3 Answers

I'm not sure if you'll run into any issues there but it's a bit dirty - Spring has really nice built in support for working with Kafka so I would lean towards that (there's plenty of documentation on that on the web, but a nice one is: https://www.baeldung.com/spring-kafka).

You'll need the following dependency:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

Configuration is as easy adding the @EnableKafka annotation to a config class and then setting up Listener and ConsumerFactory beans

Once configured you can setup a consumer easily as follows:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println("Received Message: " + message"+ "from partition: " + partition);
}
like image 177
Markoorn Avatar answered Nov 03 '22 21:11

Markoorn


Implementation look ok but using CommandLineRunner is not made for this. CommandLineRunner is used to run some task on startup only once. From Design perspective it's not very elegant. I would rather use spring integration adapter component with kafka. You can find example here https://github.com/raphaelbrugier/spring-integration-kafka-sample/blob/master/src/main/java/com/github/rbrugier/esb/consumer/Consumer.java .

like image 1
Rishi Saraf Avatar answered Nov 03 '22 22:11

Rishi Saraf


To just answer my own question, I had a look at Kafka integration libraries like Spring-Kafka and Spring Cloud Stream but the integration with Confluent's Schema Registry is either not finished or not quite clear to me. It's simply enough for primitives but we need it for typed Avro objects that are validated by the schema registry. I now implemented a Kafka-agnostic solution, based on the answer at Spring Boot - Best way to start a background thread on deployment

The final code looks like this:

@Component
public class AccountStreamConsumer implements DisposableBean, Runnable {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final AccountService accountService;
    private final KafkaProperties kafkaProperties;
    private final Consumer<AccountKey, Account> consumer;

    @Autowired
    public AccountStreamConsumer(AccountService accountService, KafkaProperties kafkaProperties,
                                 ConfluentProperties confluentProperties) {

        this.accountService = accountService;
        this.kafkaProperties = kafkaProperties;

        if (!kafkaProperties.getEnabled()) {
            consumer = null;
            return;
        }

        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, confluentProperties.getSchemaRegistryUrl());
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, kafkaProperties.getSecurityProtocolConfig());
        props.put(SaslConfigs.SASL_MECHANISM, kafkaProperties.getSaslMechanism());
        props.put(SaslConfigs.SASL_JAAS_CONFIG, PlainLoginModule.class.getName() + " required username=\"" + kafkaProperties.getUsername() + "\" password=\"" + kafkaProperties.getPassword() + "\";");
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getAccountConsumerGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(kafkaProperties.getAccountsTopicName()));

        Thread thread = new Thread(this);
        thread.start();
    }

    @Override
    public void run() {
        if (!kafkaProperties.getEnabled())
            return;

        logger.debug("Started account stream consumer");
        try {
            //noinspection InfiniteLoopStatement
            while (true) {
                ConsumerRecords<AccountKey, Account> records = consumer.poll(Duration.ofSeconds(10L));
                List<Account> accounts = new ArrayList<>();
                records.iterator().forEachRemaining(record -> accounts.add(record.value()));
                if (accounts.size() != 0)
                    accountService.store(accounts);
            }
        } catch (WakeupException e) {
            logger.info("Account stream consumer woken up for exiting.");
        } finally {
            consumer.close();
        }
    }

    @Override
    public void destroy() {
        if (consumer != null)
            consumer.wakeup();

        logger.info("Woke up account stream consumer, exiting.");
    }
}
like image 1
Sebastiaan van den Broek Avatar answered Nov 03 '22 20:11

Sebastiaan van den Broek