I need to read an encrypted message from a Kafka topic. My current code which reads strings from the topic looks like this :
JavaPairReceiverInputDStream<String, String> pairrdd =
KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
What should I do to change this code from the kafka queue to make sure that the array of bytes read, the encrypted data is not corrupted
For reading data from Kafka in <byte[], byte[]> form, you can use KafkaUtils. Like this-
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("value.deserializer", ByteArrayDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
JavaInputDStream<ConsumerRecord<byte[], byte[]>> pairrdd =
KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Subscribe(topics, kafkaParams)
);
I hope this helps!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With