I have written a storm topology. I basically want to send tuples in avro schema in form of byte array to kafka topic.
This is how I set the bolt :
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
.fieldsGrouping(BOLT1, new Fields("key"));
And this is how I am converting to byte array
Schema schema = avroObject.getSchema();
DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(ping, encoder);
encoder.flush();
byte[] message = out.toByteArray();
String key = new String(message, "UTF-8");
When I emit tuple in following way I don't see anything in kafka topic (send byte stream to kafka) :
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
but Instead If I convert byte array to string and then to kafka topic it works :
Something like below :
builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
.fieldsGrouping(BOLT1, new Fields("key"));
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
What am I doing wrong? How do I send byte stream to kafka topic using storm kafka bolt?
You have the problem because your MD5 hash is incorrect:
You say that if you convert your bytearray to a java String it works: it is because the value of the MD5 is correct according to a String.
collector.emit(tuple, new Values(Obj.hashMD5(key), key));
As you can see the MD5 is calculated on a String parameter and you send the String corresponding to the MD5: everything is good!
But if you send a bytearray, you need to calculate the MD5 on a bytearray and it will be a bytearray as a result, not a String. Your code:
collector.emit(tuple, new Values(Obj.hashMD5(key), message));
is incorrect as the MD5 does not correspond to message but to the converted value of message in UTF-8 as a String which is lossy (see below).
Here is a link to another question on SO to calculate a MD5 correctly in a bytearray format:
How can I generate an MD5 hash?
This is because converting bytearray to String is lossy in Java (contrary to C) and you will miss data in the process as some bytes do not correspond to a char in Java encoding (you have some of these apparently in your data).
So your KafkaBolt should be
KafkaBolt<byte[], byte[]>
I don't know if it is sufficient to send a bytearray MD5 along with your bytearray in kafka storm. If it is not, you will have to use an encoding that is lossless between bytearray and java String such as BASE64:
Base64 Encoding in Java
You will have to convert your bytearray to a base64 string, using
KafkaBolt<String, String>
and then sending the data as usual
collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));
It also means that when you fetch the data from kafka, it will be a String in base64 that you will have to decode to get the bytearray back.
Hope that helps.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With