I am trying to create the simplest as possible hello world with Spring Cloud + Kafka Streams + Spring Boot 2.
I realize I miss basic concepts. Basically, I understand that:
1 - I need to define an outbound stream to write messages to a Kafka topic, and an inbound stream to read messages from a Kafka topic
public interface LoansStreams {
String INPUT = "loans-in";
String OUTPUT = "loans-out";
@Input(INPUT)
SubscribableChannel inboundLoans();
@Output(OUTPUT)
MessageChannel outboundLoans();
}
2 - configure Spring Cloud Stream to bind to my streams
@EnableBinding(LoansStreams.class)
public class StreamsConfig {
}
3 - configure Kafka properties
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
loans-in:
destination: loans
contentType: application/json
loans-out:
destination: loans
contentType: application/json
4 - create model for exchange messages
@Getter @Setter @ToString @Builder
public class Loans {
private long timestamp;
private String result;
}
5 - write to Kafka
@Service
@Slf4j
public class LoansService {
private final LoansStreams loansStreams;
public LoansService(LoansStreams loansStreams) {
this.loansStreams = loansStreams;
}
public void sendLoan(final Loans loans) {
log.info("Sending loans {}", loans);
MessageChannel messageChannel = loansStreams.outboundLoans();
messageChannel.send(MessageBuilder
.withPayload(loans)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.build());
}
}
6 - listen to Kafka topic
@Component
@Slf4j
public class LoansListener {
@StreamListener(LoansStreams.INPUT)
public void handleLoans(@Payload Loans loans) {
log.info("Received results: {}", loans);
}
}
I spent a whole day reading few blogs and I assume that the above code is at least workable. I amo not sure I realy coding the best aproach as possible. By the way, I get the error mentioned in the topic:
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 18:33:05.619 ERROR 14784 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'outputBindingLifecycle'; nested exception is java.lang.IllegalStateException: A default binder has been requested, but there are no binders available for 'org.springframework.cloud.stream.messaging.DirectWithAttributesChannel' : , and no default binder has been set.
Googling for solution, I found someone saying to code StreamListe returning the model so I replaced it with:
@StreamListener(LoansStreams.INPUT)
@SendTo("loans-out")
public KStream<?, Loans> process(KStream<?, Loans> l) {
log.info("Received: {}", l);
return l;
}
and then I get an error even less clear at least to me (previous error clearly mentioned some binder issue):
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-26 19:01:06.016 ERROR 13276 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
java.lang.IllegalArgumentException: Method must be declarative
at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.6.RELEASE.jar:5.1.6.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.validateStreamListenerMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:510) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.streams.KafkaStreamsStreamListenerSetupMethodOrchestrator.orchestrateStreamListenerSetupMethod(KafkaStreamsStreamListenerSetupMethodOrchestrator.java:168) ~[spring-cloud-stream-binder-kafka-streams-2.1.2.RELEASE.jar:2.1.2.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor.doPostProcess(StreamListenerAnnotationBeanPostProcessor.java:226) ~[spring-cloud-stream-2.1.2.RELEASE.jar:2.1.2.RELEASE]
In case it helps somehow, I want to evoluate this idea to apply SAGAS but it is not the focus of this question. Firstly, I need get the basic up and running.
*edited
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.mybank</groupId>
<artifactId>kafka-cloud-stream</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-cloud-stream</name>
<description>Spring Cloud Stream With Kafka</description>
<properties>
<java.version>11</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-web -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<!-- version>5.1.5.RELEASE</version-->
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
"A default binder has been requested, but there are no binders available ...", please add dependency as below.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
You can define your default binder in application.yml (or application.properties)
spring:
cloud:
stream:
bindings:
...
default-binder: kafka
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With