Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark-Streaming hangs with kafka starting offset at earliest (Kafka 2, spark 2.4.3)

i'm having an issue with Spark-Streaming and Kafka. While running a sample program to consume from a Kafka topic and output micro-batched results to the terminal, my job seems to hang when i set the option:

df.option("startingOffsets", "earliest")

Starting the job from the latest offset works fine, results are printed to the terminal as each micro batch streams through.

I was thinking maybe this was a resouces issue--i'm trying to read from a topic with quite a bit of data. However i don't seem to have memory/cpu issues (running this job with a local[*] cluster). The job never really seems to start, but just hangs on the line:

19/09/17 15:21:37 INFO Metadata: Cluster ID: JFXVL24JQ3K4CEbE-VA58A

  val sc = new SparkConf().setMaster("local[*]").setAppName("spark-test")
  val streamContext = new StreamingContext(sc, Seconds(1))
  val spark = SparkSession.builder().appName("spark-test")
    .getOrCreate()

  val topic = "topic.with.alotta.data"

  //subscribe tokafka
  val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()

 //write
 df.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .start()
    .awaitTermination()

I'd expect to see results printed to the console....but, the application just seems to hang as I mentioned. Any thoughts? It feels like a spark resource issue (because i'm running a local "cluster" against a topic that has a lot of data. Is there something about the nature of streaming dataframes that i'm missing?

like image 627
Ivan Miller Avatar asked Mar 04 '23 10:03

Ivan Miller


1 Answers

Writing to console causes all data to be collected in memory in the driver every trigger. Since you're currently not limiting the size of your batches, this means the entire topic contents is being accumulated in the driver. See https://spark.apache.org/docs/2.4.3/structured-streaming-programming-guide.html#output-sinks

Setting a limit on your batch sizes should fix your issue. Try adding the maxOffsetsPerTrigger setting when reading from Kafka...

  val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "127.0.0.1:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .option("maxOffsetsPerTrigger", 1000)
    .load()

See https://spark.apache.org/docs/2.4.3/structured-streaming-kafka-integration.html for details.

like image 79
andybryant Avatar answered May 04 '23 22:05

andybryant