Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set group.id for consumer group in kafka data source in Structured Streaming?

I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?

Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?

If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?

like image 592
Panagiotis Fytas Avatar asked Mar 26 '19 10:03

Panagiotis Fytas


People also ask

What is the difference between spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is offset in Kafka?

Each partition is going to be a stream of data as well and each Partition will have the data in it being ordered and each message within a Partition will get an incremental ID which is the position of the message in the Partition and that specific ID is called an Offset.

What is the difference between Kafka and spark streaming?

Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.


1 Answers

Currently (v2.4.0) it is not possible.

You can check following lines in Apache Spark project:

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 - generate group.id

https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - set it in properties, that are used to create KafkaConsumer

In master branch you can find modification, that enable to setting prefix or particular group.id

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - generate group.id based on group prefix (groupidprefix)

https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - set previously generated groupId, if kafka.group.id wasn't passed in properties

like image 58
Bartosz Wardziński Avatar answered Sep 24 '22 05:09

Bartosz Wardziński