I am using Kafka for a microservices project. I want to invoke an event whenever I save a record to the database. I have been looking at tutorials about Spring Cloud Stream. All of them are using @EnableBinding, @Input, @Output annotations. When I try to use them, it says they are deprecated. I am using spring initialzr. The release notes say that I should use Supplier, Consumer, and Function instead of the old methods like Input, Output, and Process.
@Bean
public Supplier<String> toUpperCase() {
return () -> {
return "hello from supplier";
};
}
When I use a Supplier like this, it generates the message every second as it is also highlighted in the tutorials. I don't want it to be published every second. I want it to be published when I want it to. It says I should invoke its get() method but I don't know how. Tutorials use deprecated functions to achieve such a functionality. How can I achieve such behavior without deprecated functions or how can I use the EnableBinder annotation without it is saying it is deprecated?
This story talks about writing Spring Cloud Functions using Java Functional Program. Before that, we can write using @EnableBinding, @Input, @Output. But these annotations have been deprecated as of 3.1 in favor of the functional programming model.
Class StreamBridgeA class which allows user to send data to an output binding.
Reactive Functions support Since Spring Cloud Function is build on top of Project Reactor there isn't much you need to do to benefit from reactive programming model while implementing Supplier , Function or Consumer .
Spring Cloud Stream is a framework for building message-driven microservice applications. Spring Cloud Stream builds upon Spring Boot and uses Spring Integration to provide connectivity to message brokers. It provides configuration of middleware, introducing the concepts of publish-subscribe, consumer groups, and partitions.
The core building blocks of Spring Cloud Stream are: Destination Binders: Components responsible to provide integration with the external messaging systems. Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).
Artifacts for various modules of spring-cloud-stream available in Maven Central repository under spring-cloud-stream-*. If you wish to contribute you can pick any issue that is currently listed or simply submit a PR with functionality that you believe would benefit the project. You can also look for issues with ideal-for-contribution label.
Destination Bindings: Bridge between the external messaging systems and application code (producer/consumer) provided by the end user. Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).
You can check my repo for a demo project at https://github.com/HabeebCycle/spring-cloud-stream-implemention
It shows how to implement cloud-stream using RabbitMQ and Kafka for both the supplier and the consumer and also end-to-end testing of both services.
for your case: In your supplier bean do something like this:
@Bean
public Supplier<DataEvent<String, User>> savedMessage() {
return () -> {
return null;
};
}
Spring provides StreamBridge in the function package that can be used to send events. Let's assume you have a service layer that saves into the database. The first thing to do is to create an autowired StreamBridge that is injected by constructor binding and use it to send your message as follows. Notice that the name of the supplier should be the binding name of your output as explained in the documentation.
private final StreamBridge stream;
private final UserRepository repo;
// Store your topic/binding name as the supplier name as follows
private static final String SUPPLIER_BINDING_NAME = "savedMessage-out-0"
public UserService(UserRepository repo, StreamBridge stream) {
this.repo = repo;
this.stream = stream;
}
// Your save method
public void saveUser(User user) {
// Do some checking...
//save your record
User user = repo.save(user);
//check if user is saved or not null
//create your message event (Assuming you have a DataEvent class)
DataEvent<String, User> event = new DataEvent<>("User Saved", user);
boolean sent = stream.send(SUPPLIER_BINDING_NAME, event));
// Check the repo above for proper implementation.
}
For the consumer implementation, check my repo above.
There is also an implementation here although written in Kotlin https://piotrminkowski.com/2020/06/05/introduction-to-event-driven-microservices-with-spring-cloud-stream/
You can also check Spring recent project on GitHub here https://github.com/spring-cloud/spring-cloud-stream-samples/
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With