Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Structured Streaming with Kafka - How to repartition the data and distribute the processing among worker nodes

If my Kafka topic receives records like

CHANNEL | VIEWERS | .....
ABC     |  100    | .....
CBS     |  200    | .....

And I have Spark structured streaming code to read and process Kafka records as follows:

val spark = SparkSession 
      .builder 
      .appName("TestPartition") 
      .master("local[*]") 
      .getOrCreate() 

    import spark.implicits._ 

    val dataFrame = spark 
      .readStream 
      .format("kafka") 
      .option("kafka.bootstrap.servers", 
      "1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092") 
      .option("subscribe", "partition_test") 
      .option("failOnDataLoss", "false") 
      .load() 
      .selectExpr("CAST(value AS STRING)") 
      // I will use a custom UDF to transform to a specific object

Currently, I process the records using foreachwriter as follows:

val writer = new ForeachWriter[testRec] {
    def open(partitionId: Long, version: Long): Boolean = {
      true
    }
    def process(record: testRec) = {
      handle(record)
    }
    def close(errorOrNull: Throwable): Unit = {
    }
  }

  val query = dataFrame.writeStream
    .format("console")
    .foreach(writer)
    .outputMode("append")
    .start()

The code works just fine. But, what I would like to do is to partition the incoming data by channels so that each worker is responsible for specific channels and I do in-memory computations related to that channel inside handle() block. Is that possible ? If yes, how do I do that ?

like image 491
cucucool Avatar asked Nov 07 '22 08:11

cucucool


1 Answers

The code as is applies the handle method at record level and independent of the partition of the record.

I see two options to ensure that all messages of the same channel will be processed on the same executor:

  1. If you have control over the KafkaProducer producing data into the topic "partition_test", you could set the value of channel as the key of the Kafka message. By default, a KafkaProducer uses the key to define the partition where the data is written into. This will ensure that all messages with the same key will land in the same Kafka topic partition. As a Spark Structured Streaming job consuming a Kafka topic will match the Kafka partitions, your resulting dataFrame will have the same amount of partitions as the Kafka topic and all messages for the same channel are in the same partition.

  2. As already written in the comment, you can simply repartition your dataFrame based on the values of the column channel by doing dataFrame.repartition(n, col("columnName")), where n is the number of partitions. That way, all records with the same channel will land in the same partition and therefore be processed on the same executor.

Two important notes:

  • Taking ownership of partitioning (of Dataframes or in Kafka topic) requires some extra attention as you could end up in something that is called "data skew". A data skew happens when you have partitions with a lot of messages compared to partitions that just have a few messages. This will negatively impact your overall performance.

  • As long as you are using the foreach Output Sink it will not matter anyway how your data is partitioned as you are dealing at record level. In case you are looking for some more control you may rather use the foreachBatch sink (available in Spark 2.4+). The foreachBatch output sink gives you control over the batch Dataframe for each micro-batch and you can perform partition-based logic with foreachPartitions or mapPartitions.

like image 83
Michael Heil Avatar answered Nov 15 '22 09:11

Michael Heil