I am trying to find an example where I can produce and subscribe avro messages from kafka.
At this point of time, I want to use a "vanilla" kafka deployment without any confluent add-ons.
is this possible? All examples I found so far very quickly start using confluent specific tools for avro messages.
I am sure that there should be a way by which I can publish and consume avro messages on just kafka platform with any 'distribution specific' no addons.
You can test whether your application has received data from the topic. But if your application uses Apache Avro for serialization, and you do not use Confluent Schema Registry (which is the case when you want to use spring-kafka-test for testing) then you have a problem.
Spring framework has great support for testing your Spring application with Apache Kafka.
Of course you can do that without any Confluent tooling. But you have to do additional work on your side (e.g. in your application code) -- which was the original motivation of providing Avro-related tooling such as the ones from Confluent that you mentioned.
One option is to manually serialize/deserialize the payload of your Kafka messages (e.g. from YourJavaPojo
to byte[]
) by using the Apache Avro Java API directly. (I suppose that you implied Java as the programming language of choice.) How would this look like? Here's an example.
byte[]
), and then use Kafka's Java producer client for the writing encoded payload to a Kafka topic.byte[]
to Java pojo).You can also use the Avro API directly, of course, when working with stream processing tools like Kafka Streams (will be included in upcoming Apache Kafka 0.10) or Apache Storm.
Lastly, you also have the option to use some utility libraries (whether from Confluent or elsewhere) so that you don't have to use the Apache Avro API directly. For what it's worth, I have published some slightly more complex examples at kafka-storm-starter, e.g. as demonstrated by AvroDecoderBolt.scala. Here, the Avro serialization/deserialization is done by using the Scala library Twitter Bijection. Here's an example snippet of AvroDecoderBolt.scala
to give you the general idea:
// This tells Bijection how to automagically deserialize a Java type `T`,
// given a byte array `byte[]`.
implicit private val specificAvroBinaryInjection: Injection[T, Array[Byte]] =
SpecificAvroCodecs.toBinary[T]
// Let's put Bijection to use.
private def decodeAndEmit(bytes: Array[Byte], collector: BasicOutputCollector) {
require(bytes != null, "bytes must not be null")
val decodeTry = Injection.invert(bytes) // <-- deserialization, using Twitter Bijection, happens here
decodeTry match {
case Success(pojo) =>
log.debug("Binary data decoded into pojo: " + pojo)
collector.emit(new Values(pojo)) // <-- Here we are telling Storm to send the decoded payload to downstream consumers
()
case Failure(e) => log.error("Could not decode binary data: " + Throwables.getStackTraceAsString(e))
}
}
So yes, you can of course opt to not use any additional libraries such as Confluent's Avro serializers/deserializers (currently made available as part of confluentinc/schema-registry) or Twitter's Bijection. Whether that's worth the additional effort is up to you to decide.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With