I see that following annotations are depreciated for Spring Cloud Stream
@Input
@Output
@EnableBinding
@StreamListener
Please provide examples and links to documentation as how to do it in functional way.
This story talks about writing Spring Cloud Functions using Java Functional Program. Before that, we can write using @EnableBinding, @Input, @Output. But these annotations have been deprecated as of 3.1 in favor of the functional programming model.
The support for StreamListener is deprecated starting with 3.1. 0 of Spring Cloud Stream.
The annotation @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface Processor. Both channels are bindings that can be configured to use a concrete messaging-middleware or binder.
Spring Cloud Function is a project by the Spring team at Pivotal that provides a way to build software around units of work called functions which can be deployed in a serverless platform or a webserver.
Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:
interface InputChannels {
@Input("input")
SubscribableChannel input();
}
@EnableBinding(InputChannels.class)
public class PubSubDemo {
@StreamListener("input")
public void listen() {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
}
New version code will be like :
public class PubSubDemo {
@Bean
Consumer<String> input() {
return str -> {
if (LOG.isInfoEnabled()) {
LOG.info(context.toString());
}
};
}
}
Check Consumer bean replaced the @StreamListener and the @Input.
Regarding the configuration, if before in order to configure you had an application.yml looking like so:
spring:
cloud:
stream:
bindings:
input:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
Now new configuration will be like
spring:
cloud:
stream:
bindings:
input-in-0:
destination: destination
group: group
consumer:
concurrency: 10
max-attempts: 3
Optional: Spring cloud stream lib creates the bean by its own. If the bean is not created. use below properties.
spring:
cloud:
stream:
function:
definition: input
The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.
Now let consider Output channels:
public interface OutputChannels {
@Output
MessageChannel output();
}
@Service
@EnableBinding(OutputChannels.class)
class PubSubSendQueue {
OutputChannels outputChannel;
public void publish() {
outputChannel.output().send("Hello");
}
}
Now with the Functions code will be as :
@Service
class PubSubSendQueue {
@Bean
public Supplier<String> output(){
return Supplier { "Adam" }
}
}
Add below in application.properties file
spring.cloud.stream.bindings.output-out-0.destination=destination
This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications
The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.
https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function
Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model
There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.
Here some more helpful information:
Sending a message
Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.
Before
myDataSource.output().send(message);
After
streamBridge.send("myData-out-0", message);
Replacing a ServiceActivator
Before
@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }
After
@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
return message -> {...};
}
Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With