I want to use Spark Structured Streaming to read from a secure kafka. This means that I will need to force a specific group.id. However, as is stated in the documentation this is not possible. Still, in the databricks documentation https://docs.azuredatabricks.net/spark/latest/structured-streaming/kafka.html#using-ssl, it says that it is possible. Does this only refer to the azure cluster?
Also, by looking at the documentation of the master branch of the apache/spark repo https://github.com/apache/spark/blob/master/docs/structured-streaming-kafka-integration.md, we can understand that such functionality is intended to be added at later spark releases. Do you know of any plans of such a stable release, that is going to allow setting that consumer group.id?
If not, are there any workarounds for Spark 2.4.0 to be able to set a specific consumer group.id?
Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.
Each partition is going to be a stream of data as well and each Partition will have the data in it being ordered and each message within a Partition will get an incremental ID which is the position of the message in the Partition and that specific ID is called an Offset.
Kafka analyses the events as they unfold. As a result, it employs a continuous (event-at-a-time) processing model. Spark, on the other hand, uses a micro-batch processing approach, which divides incoming streams into small batches for processing.
Currently (v2.4.0) it is not possible.
You can check following lines in Apache Spark project:
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L81 - generate group.id
https://github.com/apache/spark/blob/v2.4.0/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L534 - set it in properties, that are used to create KafkaConsumer
In master branch you can find modification, that enable to setting prefix or particular group.id
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L83 - generate group.id based on group prefix (groupidprefix
)
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L543 - set previously generated groupId, if kafka.group.id
wasn't passed in properties
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With