Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Restarting Spark Structured Streaming Job consumes Millions of Kafka messages and dies

We have a Spark Streaming Application running on Spark 2.3.3

Basically, it opens a Kafka Stream:

  kafka_stream = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "mykafka:9092") \
  .option("subscribe", "mytopic") \
  .load()

The kafka topic has 2 partitions. After that, there are some basic filtering operations, some Python UDFs and an explode() on a column, like:

   stream = apply_operations(kafka_stream)

where apply_operations does all the work on the data. In the end, we would like to write the stream to a sink, i. e.:

   stream.writeStream \
   .format("our.java.sink.Class") \
   .option("some-option", "value") \
   .trigger(processingTime='15 seconds') \
   .start()

To let this stream operation run forever, we apply:

   spark.streams.awaitAnyTermination()

In the end.

So far, so good. Everything runs for days. But due to a network problem, the job died for a few days, and there are now millions of messages in the kafka stream waiting to be catched up.

When we restart the streaming data job using spark-submit, the first batch will be too large and will take ages to be completed. We thought there might be a way to limit the size of the first batch with some parameter, but we did not find anything that helped.

We tried:

  • spark.streaming.backpressure.enabled=true along with spark.streaming.backpressure.initialRate=2000 and spark.streaming.kafka.maxRatePerPartition=1000 and spark.streaming.receiver.maxrate=2000

  • setting spark.streaming.backpressure.pid.minrate to a lower value did not had an effect either

  • setting the option("maxOffsetsPerTrigger", 10000) did not had an effect as well

Now, after we restart the pipeline, sooner or later the whole Spark Job will crash again. We cannot simply widen the memory or the cores to be used for the spark job.

Is there anything we missed to control the amount of events beeing processed in one stream-batch?

like image 837
Regenschein Avatar asked Apr 02 '19 13:04

Regenschein


People also ask

Is there any difference between Spark Streaming and Spark structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is the primary difference between Kafka streams and Spark Streaming?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.

How does Spark Streaming process the data from Kafka?

This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.

How does Spark structured Streaming work?

Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.

Does Spark Streaming need Kafka?

Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism.

How to stream from Spark to Kafka?

Spark Streaming with Kafka Example. 1 1. Run Kafka Producer Shell. First, let’s produce some JSON data to Kafka topic "json_topic", Kafka distribution comes with Kafka Producer shell, run ... 2 2. Streaming With Kafka. 3 3. Spark Streaming Write to Console. 4 4. Spark Streaming Write to Kafka Topic. 5 5. Run Kafka Consumer Shell.

What is startingoffsets earliest in Spark Streaming?

Spark Streaming uses readStream () on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.

What is Apache Spark Structured Streaming and how does it work?

Apache Spark Structured Streaming is a part of the Spark Dataset API. This is an improvement from the DStream-based Spark Streaming, which used the older RDD-based API instead. It aims to provide low-latency and fault-tolerant exactly-once processing. That’s quite an ambitious goal, but with a few caveats, they’ve managed it.

What is the difference between Apache Spark and Apache Kafka?

Kafka has its own stream library and is best for transforming Kafka topic-to-topic whereas Spark streaming can be integrated with almost any type of system. For more detail, you can refer to this blog.


1 Answers

You wrote in the comments that you are using spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation.

like image 153
cronoik Avatar answered Oct 25 '22 02:10

cronoik