Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)

I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

docker-compose up works fine, creating topics via shell works fine.

Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

When starting up the Spring application it prints the correct version of Kafka:

o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d

I try to send a message like this

kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");

Sending on client side fails with

UnknownServerException: The server experienced an unexpected error when processing the request

In the server console I get the message Magic v1 does not support record headers

Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers

Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

Any clues? Thanks!

Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

@Value("\${kafka.bootstrap-servers}")
lateinit var bootstrapServers: String

@Bean
fun producerConfigs(): Map<String, Any> =
        HashMap<String, Any>().apply {
            // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
            put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
            put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
            put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
        }

@Bean
fun producerFactory(): ProducerFactory<String, MyClass> =
        DefaultKafkaProducerFactory(producerConfigs())

@Bean
fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
        KafkaTemplate(producerFactory())
like image 575
DerM Avatar asked Dec 23 '17 15:12

DerM


1 Answers

I had a similar issue. Kafka adds headers by default if we use JsonSerializer or JsonSerde for values. In order to prevent this issue, we need to disable adding info headers.

if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

but if you need a custom JsonSerializer with specific ObjectMapper (like with PropertyNamingStrategy.SNAKE_CASE), you should disable adding info headers explicitly on JsonSerializer, as spring kafka ignores DefaultKafkaProducerFactory's property ADD_TYPE_INFO_HEADERS (as for me it's a bad design of spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

or if we use JsonSerde, then:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
like image 95
Vasyl Sarzhynskyi Avatar answered Nov 13 '22 05:11

Vasyl Sarzhynskyi