I am using Kafka to consume messages in Java. I want to test by starting the same app multiple times on my local box. When I start up, the first time I am able to start consuming messages from the topic. When I start up a second one I get:
Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
and dont get any messages from the topic. If I try to start more of them I get the same issues.
The configuration I am using for Kafka is
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
properties:
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
spring.json.use.type.headers: false
listener:
missing-topics-fatal: false
I have two topics
@Configuration
public class KafkaTopics {
@Bean("alertsTopic")
public NewTopic alertsTopic() {
return TopicBuilder.name("XXX.alerts")
.compact()
.build();
}
@Bean("serversTopic")
public NewTopic serversTopic() {
return TopicBuilder.name("XXX.servers")
.compact()
.build();
}
}
And two listeners in different class files.
@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.ServerInfo"
})
public void registerServer(
@Payload(required = false) ServerInfo serverInfo
)
@KafkaListener(topics = ALERTS_KAFKA_TOPIC,
id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafkaKey",
"spring.json.value.default.type=com.devhaus.learningjungle.db.kafka.AlertOnKafka"
})
public void processAlert(
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) AlertOnKafkaKey key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partitionId,
@Header(KafkaHeaders.OFFSET) long offset,
@Payload(required = false) AlertOnKafka alert)
The consumer group-id is mandatory, it plays a major role when it comes to scalable message consumption. To start a consumer group-id is mandatory.
The member ID is a unique identifier given to a consumer by the coordinator upon initially joining the group. When the first JoinGroup request is received in the broker, the configured max.poll.interval.ms starts ticking .
Step1: Open the Windows command prompt. Step2: Use the '-group' command as: 'kafka-console-consumer -bootstrap-server localhost:9092 -topic -group <group_name>' . Give some name to the group. Press enter.
Kafka consumers belonging to the same consumer group share a group id. The consumers in a group then divides the topic partitions as fairly amongst themselves as possible by establishing that each partition is only consumed by a single consumer from the group.
From my analysis. This is normal behaviour, you can change the log levels to exclude it.
The reason for this is if the server detects that the client can support member.id
it will give that error back to the client. This is noted in KIP-394.
The client will then reconnect back to the server with a generated member ID.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With