Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Understanding Spark Structured Streaming Parallelism

I'm a newbie in the Spark world and struggling with some concepts.

How does parallelism happen when using Spark Structured Streaming sourcing from Kafka ?

Let's consider the following snippet code:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();

I've read that the parallelism is related with the number of data partitions, and the number of partitions for a Dataset is based on the spark.sql.shuffle.partitions parameter.

  1. For every batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions? For example, spark.sql.shuffle.partitions=5 and Batch1=100 rows, will we end up with 5 partitions with 20 rows each ?

  2. Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions ?

UPDATE:

Inside the gDataset.mapGroupsWithState is where I process each key/values and store the result in the HDFS. So, the output sink is being used only to output some stats in the console.

like image 928
Kleyson Rios Avatar asked Jan 13 '18 12:01

Kleyson Rios


People also ask

How do you achieve parallelism in Spark Streaming?

(1) Increase the number of receivers: If there are too many records for a single receiver (single machine) to read in and distribute so that is a bottleneck. So we can increase the no. of the receiver depending on the scenario. (2) Re-partition the receive data: If one is not in a position to increase the no.

How does Spark structured Streaming work?

Spark structured streaming allows for near-time computations of streaming data over Spark SQL engine to generate aggregates or output as per the defined logic. This streaming data can be read from a file, a socket, or sources such as Kafka.

What is difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What do you understand about Dstreams in Spark?

Discretized Stream or DStream is the basic abstraction provided by Spark Streaming. It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.


1 Answers

For every Batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions?

They will be divided once they reach groupByKey which is a shuffle boundary. When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partitions

Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions

Generally yes, but it also depends on how you setup your Kafka topic. Although not visible to you from the code, Spark will internally split the data different stage into smaller tasks and distribute them among the available executors in the cluster. If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions.

like image 195
Yuval Itzchakov Avatar answered Oct 03 '22 21:10

Yuval Itzchakov