I want to limit the rate when fetching data from kafka. My code looks like:
df = spark.read.format('kafka') \
.option("kafka.bootstrap.servers",'...')\
.option("subscribe",'A') \
.option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
.option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
.option("maxOffsetsPerTrigger",20) \
.load() \
.cache()
However when I call df.count()
, the result is 600. What I expected is 20. Does anyone knows why "maxOffsetsPerTrigger" doesn't work.
Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.
Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.
You are bringing 200 records per each partition (0, 1, 2), the total is 600 records.
As you can see here:
Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.
This means that for each trigger or fetch process Kafka will get 20 records, but in total, you will still fetch the total records set in the configuration (200 per partition).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With