Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Error to serialize message when sending to kafka topic

i need to test a message, which contains headers, so i need to use MessageBuilder, but I can not serialize.

I tried adding the serialization settings on the producer props but it did not work.

Can someone help me?

this error:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

My test class:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")
            .build()));

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    consumer.subscribe(Collections.singleton("transaction_topic"));
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
    consumer.commitSync();

    assertThat(records.count()).isEqualTo(1);
}

}

like image 716
Tiago Costa Avatar asked Apr 25 '17 13:04

Tiago Costa


1 Answers

I'd say the error is obvious:

Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Where your value is GenericMessage, but StringSerializer can work only with strings.

What you need is called JavaSerializer which does not exist, but not so difficult to write:

public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
            objectStream.writeObject(data);
            objectStream.flush();
            objectStream.close();
            return byteStream.toByteArray();
        }
        catch (IOException e) {
            throw new IllegalStateException("Can't serialize object: " + data, e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }

}

And configure it for that value.serializer property.

like image 118
Artem Bilan Avatar answered Sep 19 '22 09:09

Artem Bilan