Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is there a code sample for multiple producers in spring kafka?

I have an application that may need multiple producers. All code samples I see seem to support a single producer, reading config from app during app startup. If there are multiple producers and we want to pass in different producer config, is there out of the box support in Spring? Or should I just go without spring in that case?

like image 708
Merrin Avatar asked Feb 24 '18 03:02

Merrin


People also ask

Can Kafka have multiple producer?

Kafka is able to seamlessly handle multiple producers that are using many topics or the same topic. The consumer subscribes to one or more topics and reads the messages.

How many producers does Kafka have?

One producer for all topics and partitions Probably the optimal choice for most applications. The Kafka producer batches any written message before sending them to the Kafka cluster for up to batch.

Is KafkaListener single threaded?

KafkaConsumer instances are single threaded, hence for each @KafkaListener method you define a new thread is created to execute the poll loop. You may wish to scale the number of consumers you have listening on a particular topic. There are several ways you may achieve this.


2 Answers

you will have to create two different ProducerFactory below is example

    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.kafka.core.DefaultKafkaProducerFactory;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.kafka.core.ProducerFactory;

    import java.util.HashMap;

    @Configuration
    public class KafkaProducerConfig {


        @Bean
        public ProducerFactory<String, String> confluentProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9092");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }


        @Bean
        public ProducerFactory<String, String> cloudraProducerFactory() {

            HashMap<String, Object> configProps = new HashMap<String, Object>();
            configProps.put(
                    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                    "localhost:9094");
            configProps.put(
                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            configProps.put(
                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    StringSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean(name = "confluent")
        public KafkaTemplate<String, String> confluentKafkaTemplate() {
            return new KafkaTemplate<>(confluentProducerFactory());
        }

        @Bean(name = "cloudera")
        public KafkaTemplate<String, String> clouderaKafkaTemplate() {
            return new KafkaTemplate<>(cloudraProducerFactory());
        }

    }




public class ProducerExample {

    @Autowired
    @Qualifier("cloudera")
    private KafkaTemplate clouderaKafkaTemplate;


    @Autowired
    @Qualifier("confluent")
    private KafkaTemplate confluentKafkaTemplate;

    public void send() {
        confluentKafkaTemplate.send("TestConfluent", "hey there..confluent");
        clouderaKafkaTemplate.send("TestCloudEra","hey there.. cloudera");
    }

}
like image 179
Girdhar Singh Rathore Avatar answered Sep 28 '22 08:09

Girdhar Singh Rathore


If you still want to keep your configuration in application.yaml as usual, and keep Java configuration as minimum as possible, you can extend KafkaProperties.Producer.


@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-1")
@RequiredArgsConstructor
class FirstProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-1")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-1")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

@Configuration
@ConfigurationProperties(prefix = "spring.kafka.producer-2")
@RequiredArgsConstructor
class SecondProducer extends KafkaProperties.Producer {
    private final KafkaProperties common;

    @Qualifier("producer-2")
    @Bean
    public ProducerFactory<?, ?> producerFactory() {
        final var conf = new HashMap<>(
            this.common.buildProducerProperties()
        );
        conf.putAll(this.buildProperties());
        return new DefaultKafkaProducerFactory<>(conf);

    }

    @Qualifier("producer-2")
    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate() {
        return new KafkaTemplate<>(this.producerFactory());

    }
}

like image 28
andreoss Avatar answered Sep 28 '22 07:09

andreoss