Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

@EnableBinding @deprecated as of 3.1 in favor of functional programming model

I see that following annotations are depreciated for Spring Cloud Stream

@Input @Output @EnableBinding @StreamListener

Please provide examples and links to documentation as how to do it in functional way.

like image 321
meuhedet meuhedet Avatar asked Jan 05 '21 09:01

meuhedet meuhedet


People also ask

Is @EnableBinding deprecated?

This story talks about writing Spring Cloud Functions using Java Functional Program. Before that, we can write using @EnableBinding, @Input, @Output. But these annotations have been deprecated as of 3.1 in favor of the functional programming model.

Is @StreamListener deprecated?

The support for StreamListener is deprecated starting with 3.1. 0 of Spring Cloud Stream.

What is @EnableBinding?

The annotation @EnableBinding configures the application to bind the channels INPUT and OUTPUT defined within the interface Processor. Both channels are bindings that can be configured to use a concrete messaging-middleware or binder.

What is spring cloud function definition?

Spring Cloud Function is a project by the Spring team at Pivotal that provides a way to build software around units of work called functions which can be deployed in a serverless platform or a webserver.


3 Answers

Instead of working with annotation-based configuration, spring now uses detected beans of Consumer/Function/Supplier to define your streams for you. Older version the code with annotation looks like below:

interface InputChannels {
    
        @Input("input")
        SubscribableChannel input();
    }

    @EnableBinding(InputChannels.class)
    public class PubSubDemo {
    @StreamListener("input")
    public void listen() {
            
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
        }
    }

New version code will be like :

public class PubSubDemo {
   @Bean
   Consumer<String> input() {
    return str -> {
        
        if (LOG.isInfoEnabled()) {
            LOG.info(context.toString());
             }
        };
   }
}

Check Consumer bean replaced the @StreamListener and the @Input.

Regarding the configuration, if before in order to configure you had an application.yml looking like so:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

Now new configuration will be like

spring:
  cloud:
    stream:
      bindings:
        input-in-0:
          destination: destination
          group: group
          consumer:
            concurrency: 10
            max-attempts: 3

Optional: Spring cloud stream lib creates the bean by its own. If the bean is not created. use below properties.

spring:
   cloud:
     stream:
        function:
           definition: input

The in and out corresponds to the type of binding (such as input or output). The index is the index of the input or output binding. It is always 0 for typical single input/output function.

Now let consider Output channels:

    public interface OutputChannels {
        @Output
        MessageChannel output();
    }

    @Service
    @EnableBinding(OutputChannels.class)
    class PubSubSendQueue {
    
    OutputChannels  outputChannel;

    public void publish() {
        outputChannel.output().send("Hello");
    }
}

Now with the Functions code will be as :

@Service
class PubSubSendQueue {
    @Bean
    public Supplier<String> output(){
        return Supplier { "Adam" }
    }
}

Add below in application.properties file

spring.cloud.stream.bindings.output-out-0.destination=destination
like image 110
Sourav Sharma Avatar answered Oct 07 '22 07:10

Sourav Sharma


This github repository contains a lot of examples..
https://github.com/spring-cloud/stream-applications

The official documentation explains in details how to move from imperative to functional style in spring cloud stream applications with kafka streams but is the same without it.

https://docs.spring.io/spring-cloud-stream/docs/3.1.0/reference/html/spring-cloud-stream.html#spring_cloud_function

Please also check this post..
https://spring.io/blog/2019/10/14/spring-cloud-stream-demystified-and-simplified

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_programming_model

There is an example of imperative code (https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.1.0/reference/html/spring-cloud-stream-binder-kafka.html#_imperative_programming_model) and how it should be developed with functional style.

like image 26
50qbits Avatar answered Oct 07 '22 07:10

50qbits


Here some more helpful information:

Sending a message

Use the org.springframework.cloud.stream.function.StreamBridge for sending messages.

Before

myDataSource.output().send(message);

After

streamBridge.send("myData-out-0", message);

Replacing a ServiceActivator

Before

@ServiceActivator(inputChannel = MyProcessor.INPUT, outputChannel = MyProcessor.OUTPUT)
public Message<MySuperOutputMessage> transform(Message<MySuperInputMessage> message) { ... }

After

@Bean
Function<Message<MySuperInputMessage>, Message<MySuperOutputMessage>> myCoolFunction() {
    return message -> {...}; 
}

Do not forget to register "myCoolFunction" in the properties spring.cloud.function.definition.

like image 2
René Winkler Avatar answered Oct 07 '22 09:10

René Winkler