Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Spark Structured Streaming with Kafka doesn't honor startingOffset="earliest"

I've set up Spark Structured Streaming (Spark 2.3.2) to read from Kafka (2.0.0). I'm unable to consume from the beginning of the topic if messages entered the topic before Spark streaming job is started. Is this expected behavior of Spark streaming where it ignores Kafka messages produced prior to initial run of Spark Stream job (even with .option("stratingOffsets","earliest"))?

Steps to reproduce

  1. Before starting streaming job, create test topic (single broker, single partition) and produce messages to the topic (3 messages in my example).

  2. Start spark-shell with the following command: spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11: --repositories http://repo.hortonworks.com/content/repositories/releases/

  3. Execute the spark scala code below.

// Local
val df = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("subscribe", "test")

// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
  .trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))

Expected vs actual output

I expect the stream to start from offset=1. However, it starts reading from offset=3. You can see that kafka client is actually resetting the starting offset: 2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.

I can see that the spark stream processes messages that I produce after starting the streaming job.

Is this expected behavior of Spark streaming where it ignores Kafka messages produced prior to initial run of Spark Stream job (even with .option("stratingOffsets","earliest"))?

2019-06-18 21:22:57 INFO  AppInfoParser:109 - Kafka version :
2019-06-18 21:22:57 INFO  AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO  MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO  Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO  AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO  ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO  AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO  AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO  ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO  Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO  MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO  KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}

Spark Batch mode

I was able to confirm that batch mode reads from the beginning - so no issue with Kafka retention configuration

val df = spark
  .option("kafka.bootstrap.servers", "localhost:9097")
  .option("subscribe", "test")

df.count // Long = 3
like image 473
Daniel Ahn Avatar asked Jun 19 '19 02:06

Daniel Ahn

1 Answers

Haha it was a simple typo: "stratingOffsets" should be "startingOffsets"

like image 139
Daniel Ahn Avatar answered Oct 13 '22 02:10

Daniel Ahn