Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why does Kafka Direct Stream create a new decoder for every message?

I have a Spark streaming app written in Java and using Spark 2.1. I am using KafkaUtils.createDirectStream to read messages from Kafka. I am using kryo encoder/decoder for kafka messages. I specified this in Kafka properties-> key.deserializer, value.deserializer, key.serializer, value.deserializer

When Spark pulls the messages in a micro batch, the messages are successfully decoded using kryo decoder. However I noticed that Spark executor creates a new instance of kryo decoder for decoding each message read from kafka. I checked this by putting logs inside the decoder constructor

This seems weird to me. Shouldn't the same instance of decoder be used for each message and each batch?

Code where I am reading from kafka:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
like image 937
scorpio Avatar asked Dec 15 '17 07:12

scorpio


People also ask

What is the primary difference between Kafka streams and spark streaming?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.

What is Kafka record?

Kafka stores records for topics on disk and retains that data even once consumers have read it. However, records aren't stored in one big file but are broken up into segments by partition where the offset order is continuous across segments for the same topic partition.

What does Kafka do?

Kafka is primarily used to build real-time streaming data pipelines and applications that adapt to the data streams. It combines messaging, storage, and stream processing to allow storage and analysis of both historical and real-time data.

What is Kafka spark streaming?

Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism. These are some features that benefit processing live data streams and channelizing them accurately.


1 Answers

If we want to see how Spark fetches data from Kafka internally, we'll need to look at KafkaRDD.compute, which is a method implemented for every RDD which tells the framework how to, well, compute that RDD:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) {
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
    s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}

What's important here is the else clause, which creates a KafkaRDDIterator. This internally has:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[K]]

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[V]]

Which as you see, creates an instance of both the key decoder and the value decoder via reflection, for each underlying partition. This means that it isn't being generated per message but per Kafka partition.

Why is it implemented this way? I don't know. I'm assuming because a key and value decoder should have a neglectable performance hit compared to all the other allocations happening inside Spark.

If you've profiled your app and found this to be an allocation hot-path, you could open an issue. Otherwise, I wouldn't worry about it.

like image 122
Yuval Itzchakov Avatar answered Oct 23 '22 05:10

Yuval Itzchakov