I have a Spark streaming app written in Java and using Spark 2.1. I am using KafkaUtils.createDirectStream
to read messages from Kafka. I am using kryo encoder/decoder for kafka messages. I specified this in Kafka properties-> key.deserializer, value.deserializer, key.serializer, value.deserializer
When Spark pulls the messages in a micro batch, the messages are successfully decoded using kryo decoder. However I noticed that Spark executor creates a new instance of kryo decoder for decoding each message read from kafka. I checked this by putting logs inside the decoder constructor
This seems weird to me. Shouldn't the same instance of decoder be used for each message and each batch?
Code where I am reading from kafka:
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
Kafka stores records for topics on disk and retains that data even once consumers have read it. However, records aren't stored in one big file but are broken up into segments by partition where the offset order is continuous across segments for the same topic partition.
Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data.
Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism. These are some features that benefit processing live data streams and channelizing them accurately.
If we want to see how Spark fetches data from Kafka internally, we'll need to look at KafkaRDD.compute
, which is a method implemented for every RDD
which tells the framework how to, well, compute that RDD
:
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]
assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
if (part.fromOffset == part.untilOffset) {
logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
What's important here is the else
clause, which creates a KafkaRDDIterator
. This internally has:
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
.newInstance(kc.config.props)
.asInstanceOf[Decoder[V]]
Which as you see, creates an instance of both the key decoder and the value decoder via reflection, for each underlying partition. This means that it isn't being generated per message but per Kafka partition.
Why is it implemented this way? I don't know. I'm assuming because a key and value decoder should have a neglectable performance hit compared to all the other allocations happening inside Spark.
If you've profiled your app and found this to be an allocation hot-path, you could open an issue. Otherwise, I wouldn't worry about it.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With