I must setup a group id for the kafka stream consumer, that matches a strict naming convention.
I cannot find a way that works after having followed deeply the documentation. As I still believe that I may have misundersood something, I prefer to open a question here for peer-review before opening a bug issue on spring-cloud-stream github repository.
A similar question was already asked one year ago, but the question is not very exaustive and not answered yet, I hope that I can give more insight to the problem here.
From several sources of the official documentation, I see that this should be pretty easy to configure in application.yaml of my app.
The documentation states that I can either:
spring.cloud.stream.kafka.default.group=<value>spring.cloud.stream.bindings.<channelName>.groupIf I setup directly the kafka generic field group-id in spring.kafka.consumer.group-id the parameter is explicitely ignored and I get the following WARN:
2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for 'group.id'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id
so I have also tried in both the sections spring.cloud.stream.default.group and spring.cloud.stream.binding.<name>.group (note that it is stated here binding and not bindings, without s).
Edit: Based on a comment from @OlegZhurakousky, this is only a typo in the error message. I tested with and without the s without success.
I have had a quick look at the stream code, and this property seems indeed the one that must be set, such as they are doing in their tests, we can see that they use for example: --spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup .
The group ID seems always ignored, after testing all the afore mentioned configuration. The group is always set to the default value, which is groupId=process-applicationId.
such as in the logs as follow:
2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
It is like the application.yaml for group is not used at all. On the other hand, the spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic field that set destination: my-custom-topic is understood and the topic is followed correctly (see the logs above).
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.2.4</version>
</dependency>
package my.custom.stuff;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class myKafkaStreamConsumer {
private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);
@Bean
public static Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
logger.debug("from STREAM: Key= {} , value = {}", key, value);
// ...
// my message handling business logic
// ...
});
}
}
I put here the version of the application.yaml that IMHO should be the most compliant with the documentation and still is not working, note that the destination is correctly used, so at least it is using the correct channel.
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
cloud:
stream:
bindings:
process-in-0:
group: myCustomGroupId
destination: "my-custom-topic"
I have tried to inject the group id in several ways, that include:
consumer subsection such as in spring.cloud.stream.bindings.process-in-0.consumer.group or spring.cloud.stream.bindings.process-in-0.consumer.group-idIt simply seems always ignored.
https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.4.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_summary_of_function_based_programming_styles_for_kafka_streams
https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#kafka-consumer-properties
Bit of a disclaimer, I'm a bit rusty on Spring but since I've been working with Kafka for the past couple of months I wanted to play with this too. I got it to work by doing two things:
use applicationId instead of group within the application properties
spring:
kafka:
bootstrap-servers: localhost:29092
consumer:
auto-offset-reset: earliest
cloud:
stream:
kafka:
binder:
functions:
process:
applicationId: MyGroupIdUsingApplicationId
bindings:
process-in-0:
bindings:
process-in-0:
destination: my-custom-topic
explicitly declare a KafkaBinderConfigurationProperties bean
I created a working sample here for you to clone and test with if you need to: https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId
Edit:
Just to add that I was focused on just seeing that the GroupId can be set and that it registers correctly, whether using the applicationId property is correct and what side effects there are, I haven't looked into it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With