If my Kafka topic receives records like
CHANNEL | VIEWERS | .....
ABC | 100 | .....
CBS | 200 | .....
And I have Spark structured streaming code to read and process Kafka records as follows:
val spark = SparkSession
.builder
.appName("TestPartition")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val dataFrame = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers",
"1.2.3.184:9092,1.2.3.185:9092,1.2.3.186:9092")
.option("subscribe", "partition_test")
.option("failOnDataLoss", "false")
.load()
.selectExpr("CAST(value AS STRING)")
// I will use a custom UDF to transform to a specific object
Currently, I process the records using foreachwriter as follows:
val writer = new ForeachWriter[testRec] {
def open(partitionId: Long, version: Long): Boolean = {
true
}
def process(record: testRec) = {
handle(record)
}
def close(errorOrNull: Throwable): Unit = {
}
}
val query = dataFrame.writeStream
.format("console")
.foreach(writer)
.outputMode("append")
.start()
The code works just fine. But, what I would like to do is to partition the incoming data by channels so that each worker is responsible for specific channels and I do in-memory computations related to that channel inside handle() block. Is that possible ? If yes, how do I do that ?
The code as is applies the handle
method at record level and independent of the partition of the record.
I see two options to ensure that all messages of the same channel will be processed on the same executor:
If you have control over the KafkaProducer producing data into the topic "partition_test", you could set the value of channel
as the key of the Kafka message. By default, a KafkaProducer uses the key to define the partition where the data is written into. This will ensure that all messages with the same key will land in the same Kafka topic partition. As a Spark Structured Streaming job consuming a Kafka topic will match the Kafka partitions, your resulting dataFrame
will have the same amount of partitions as the Kafka topic and all messages for the same channel are in the same partition.
As already written in the comment, you can simply repartition your dataFrame
based on the values of the column channel
by doing dataFrame.repartition(n, col("columnName"))
, where n
is the number of partitions. That way, all records with the same channel will land in the same partition and therefore be processed on the same executor.
Two important notes:
Taking ownership of partitioning (of Dataframes or in Kafka topic) requires some extra attention as you could end up in something that is called "data skew". A data skew happens when you have partitions with a lot of messages compared to partitions that just have a few messages. This will negatively impact your overall performance.
As long as you are using the foreach
Output Sink it will not matter anyway how your data is partitioned as you are dealing at record level. In case you are looking for some more control you may rather use the foreachBatch
sink (available in Spark 2.4+). The foreachBatch output sink gives you control over the batch Dataframe for each micro-batch and you can perform partition-based logic with foreachPartitions
or mapPartitions
.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With