Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What is the optimal way to read from multiple Kafka topics and write to different sinks using Spark Structured Streaming?

I am trying to write a Spark Structured Streaming job that reads from multiple Kafka topics (potentially 100s) and writes the results to different locations on S3 depending on the topic name. I've developed this snippet of code that currently reads from multiple topics and outputs the results to the console (based on a loop) and it works as expected. However, I would like to understand what the performance implications are. Would this be the recommended approach? Is it not recommended to have multiple readStream and writeStream operations? If so, what is the recommended approach?

my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
like image 872
Brandon Avatar asked Jun 12 '20 18:06

Brandon


People also ask

Which method we will use for reading a batch queries in Kafka?

Writing batch queries is similar to streaming queries with the exception that we use the read method instead of the readStream method and write instead of writeStream .

Which of the following methods is used to count the streaming words and aggregate the previous data?

Streaming – Complete Output Mode This mode is used only when you have streaming aggregated data. One example would be counting the words on streaming data and aggregating with previous data and output the results to sink.

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.


2 Answers

It's certainly reasonable to run a number # of concurrent streams per driver node.

Each .start() consumes a certain amount of driver resources in spark. Your limiting factor will be the load on the driver node and its available resources. 100's of topics running continuously at high rate would need to be spread across multiple driver nodes [In Databricks there is one driver per cluster]. The advantage of Spark is as you mention, multiple sinks and also a unified batch & streaming apis for transformations.

The other issue will be dealing with the small writes you may end up making to S3 and file consistency. Take a look at delta.io to handle consistent & reliable writes to S3.

like image 101
Douglas M Avatar answered Sep 27 '22 22:09

Douglas M


Advantages of below approach.

  1. Generic
  2. Multiple Threads, All threads will work individual.
  3. Easy to maintain code & support for any issues.
  4. If one topic is failed, No impact on other topics in production. You just have to focus on failed one.
  5. If you want to pull all data for specific topic, You just have to stop job for that topic, update or change the config & restart same job.

Note - Below code is not complete generic, You may need to change or tune below code.

topic="" // Get value from input arguments
sink="" // Get value from input arguments

df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", topic) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", sink) \
        .start()        

Problems with below approach.

  1. If one topic is failed, It will terminate complete program.
  2. Limited Threads.
  3. Difficult to maintain code, debug & support for any issues.
  4. If you want to pull all data for specific topic from kafka, It's not possible as any config change will apply for all topics, hence its too costliest operation.
my_topics = ["topic_1", "topic_2"]

for i in my_topics:
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", bootstrap_servers) \
        .option("subscribePattern", i) \
        .load() \
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    output_df = df \
        .writeStream \
        .format("console") \
        .option("truncate", False) \
        .outputMode("update") \
        .option("checkpointLocation", "s3://<MY_BUCKET>/{}".format(i)) \
        .start()
like image 28
Srinivas Avatar answered Sep 27 '22 21:09

Srinivas