Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Cloud Stream dynamic channels

I am using Spring Cloud Stream and want to programmatically create and bind channels. My use case is that during application startup I receive the dynamic list of Kafka topics to subscribe to. How can I then create a channel for each topic?

like image 620
Nikem Avatar asked Jan 16 '17 11:01

Nikem


People also ask

What does@ EnableBinding do?

The @EnableBinding annotation takes one or more interfaces as parameters (in this case, the parameter is a single Sink interface). An interface declares input and/or output channels. Spring Cloud Stream provides the interfaces Source , Sink , and Processor ; you can also define your own interfaces.

What is @RefreshScope in spring?

compile('org.springframework.cloud:spring-cloud-starter-config') Now, you need to add the @RefreshScope annotation to your main Spring Boot application. The @RefreshScope annotation is used to load the configuration properties value from the Config server.

What is StreamBridge?

Class StreamBridgeA class which allows user to send data to an output binding.

What is spring cloud stream Kafka?

Spring Cloud Stream is a framework designed to support stream processing provided by various messaging systems like Apache Kafka, RabbitMQ, etc. The framework allows you to create processing logic without having to deal with any specific platform.


1 Answers

I ran into similar scenario recently and below is my sample of creating SubscriberChannels dynamically.

    ConsumerProperties consumerProperties = new ConsumerProperties();
    consumerProperties.setMaxAttempts(1); 
    BindingProperties bindingProperties = new BindingProperties();
    bindingProperties.setConsumer(consumerProperties);
    bindingProperties.setDestination(retryTopic);
    bindingProperties.setGroup(consumerGroup);

    bindingServiceProperties.getBindings().put(consumerName, bindingProperties);
    SubscribableChannel channel = (SubscribableChannel)bindingTargetFactory.createInput(consumerName);
    beanFactory.registerSingleton(consumerName, channel);
    channel = (SubscribableChannel)beanFactory.initializeBean(channel, consumerName);
    bindingService.bindConsumer(channel, consumerName);
    channel.subscribe(consumerMessageHandler);
like image 70
sash Avatar answered Nov 15 '22 10:11

sash