Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to use maxOffsetsPerTrigger in pyspark structured streaming?

I want to limit the rate when fetching data from kafka. My code looks like:

df = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers",'...')\
        .option("subscribe",'A') \
        .option("startingOffsets",'''{"A":{"0":200,"1":200,"2":200}}''') \
        .option("endingOffsets",'''{"A":{"0":400,"1":400,"2":400}}''') \
        .option("maxOffsetsPerTrigger",20) \
        .load() \
        .cache()

However when I call df.count(), the result is 600. What I expected is 20. Does anyone knows why "maxOffsetsPerTrigger" doesn't work.

like image 727
杨嘉辰 Avatar asked Jun 26 '18 00:06

杨嘉辰


People also ask

What is structured streaming in Pyspark?

Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.

Can Spark structured streaming API can be used to process graph data?

Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.


1 Answers

You are bringing 200 records per each partition (0, 1, 2), the total is 600 records.

As you can see here:

Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.

This means that for each trigger or fetch process Kafka will get 20 records, but in total, you will still fetch the total records set in the configuration (200 per partition).

like image 154
dbustosp Avatar answered Nov 15 '22 11:11

dbustosp