Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set optimal config values - trigger time, maxOffsetsPerTrigger - for Spark Structured Streaming while reading messages from Kafka?

I have a Structured Streaming Application reading messages from Kafka. The total count of messages per day is approximately 18 Billion with peak message count per minute = 12,500,000. The Max message size is 2 KB.

How do I make sure my Structured Streaming app is able to handle this much volume and velocity of data? Basically, I just want to know how to set the optimal trigger time, maxOffsetsPerTrigger, or any other config which makes the job proceed smoothly, and is able to handle failures and restarts.

like image 870
ak0817 Avatar asked Oct 16 '22 14:10

ak0817


1 Answers

You can run the spark structured streaming application in either fixed interval micro-batches or continuous. Here are some of the options you can use for tuning streaming applications.

Kafka Configurations:

Number of partitions in Kafka:

You can increase the number of partitions in Kafka. As a result more number of consumers can read data simultaneously. Set this to appropriate number based on input rate and number of bootstrap servers.

Spark Streaming Configurations:

driver and executor memory configuration:

Calculate the size of data(#records * size of each message) in each batch and set the memory accordingly.

Number of executors:

Set the number of executors to number of partitions in kafka topic. This increases parallelism. Number of tasks which read data simultaneously.

Limit number of offsets:

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topic Partitions of different volume.

  val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
    .option("subscribe", "topicName")
    .option("startingOffsets", "latest")
    .option("maxOffsetsPerTrigger", "1000000")
    .load()

Recovering from Failures with Check-pointing:

In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using check-pointing and write-ahead logs.

finalDF
  .writeStream
  .outputMode("complete")
  .option("checkpointLocation", "path/to/HDFS/dir")
  .format("memory")
  .start()

Trigger:

The trigger settings of a streaming query defines the timing of streaming data processing, whether the query is going to executed as micro-batch query with a fixed batch interval or as a continuous processing query.

like image 107
SantoshK Avatar answered Oct 21 '22 05:10

SantoshK