We have a Spark Streaming Application running on Spark 2.3.3
Basically, it opens a Kafka Stream:
kafka_stream = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "mykafka:9092") \
.option("subscribe", "mytopic") \
.load()
The kafka topic has 2 partitions. After that, there are some basic filtering operations, some Python UDFs and an explode() on a column, like:
stream = apply_operations(kafka_stream)
where apply_operations does all the work on the data. In the end, we would like to write the stream to a sink, i. e.:
stream.writeStream \
.format("our.java.sink.Class") \
.option("some-option", "value") \
.trigger(processingTime='15 seconds') \
.start()
To let this stream operation run forever, we apply:
spark.streams.awaitAnyTermination()
In the end.
So far, so good. Everything runs for days. But due to a network problem, the job died for a few days, and there are now millions of messages in the kafka stream waiting to be catched up.
When we restart the streaming data job using spark-submit, the first batch will be too large and will take ages to be completed. We thought there might be a way to limit the size of the first batch with some parameter, but we did not find anything that helped.
We tried:
spark.streaming.backpressure.enabled=true along with spark.streaming.backpressure.initialRate=2000 and spark.streaming.kafka.maxRatePerPartition=1000 and spark.streaming.receiver.maxrate=2000
setting spark.streaming.backpressure.pid.minrate to a lower value did not had an effect either
setting the option("maxOffsetsPerTrigger", 10000) did not had an effect as well
Now, after we restart the pipeline, sooner or later the whole Spark Job will crash again. We cannot simply widen the memory or the cores to be used for the spark job.
Is there anything we missed to control the amount of events beeing processed in one stream-batch?
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
This approach uses a Receiver to receive the data. The Receiver is implemented using the Kafka high-level consumer API. As with all receivers, the data received from Kafka through a Receiver is stored in Spark executors, and then jobs launched by Spark Streaming processes the data.
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.
Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism.
Spark Streaming with Kafka Example. 1 1. Run Kafka Producer Shell. First, let’s produce some JSON data to Kafka topic "json_topic", Kafka distribution comes with Kafka Producer shell, run ... 2 2. Streaming With Kafka. 3 3. Spark Streaming Write to Console. 4 4. Spark Streaming Write to Kafka Topic. 5 5. Run Kafka Consumer Shell.
Spark Streaming uses readStream () on SparkSession to load a streaming Dataset from Kafka. Option startingOffsets earliest is used to read all data available in the Kafka at the start of the query, we may not use this option that often and the default value for startingOffsets is latest which reads only new data that’s not been processed.
Apache Spark Structured Streaming is a part of the Spark Dataset API. This is an improvement from the DStream-based Spark Streaming, which used the older RDD-based API instead. It aims to provide low-latency and fault-tolerant exactly-once processing. That’s quite an ambitious goal, but with a few caveats, they’ve managed it.
Kafka has its own stream library and is best for transforming Kafka topic-to-topic whereas Spark streaming can be integrated with almost any type of system. For more detail, you can refer to this blog.
You wrote in the comments that you are using spark-streaming-kafka-0-8_2.11 and that api version is not able to handle maxOffsetPerTrigger (or any other mechanism to reduce the number of consumed messages as far as I know) as it was only implemented for the newer api spark-streaming-kafka-0-10_2.11. This newer api also works with your kafka version 0.10.2.2 according to the documentation.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With