Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to stop spark structured streaming from listing all files in an S3 bucket every time

I have a structured streaming job on pyspark which does some aggregations on a filesource. I have a kinesis firehose combines the data from an IoT type application and stores the data on an S3 location as a file per minute in different folders in the following folder structure -

s3://year/month/day/hour/

My spark structured streaming job seems to hat from listing all the files that are available in my S3 bucket. As the listing process seems to be taking more time than the processingTime that I've set. I get the following warning, I was wondering if there was a way to not let this happen.

18/06/15 14:28:35 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 74364 milliseconds
18/06/15 14:28:42 WARN FileStreamSource: Listed 4449 file(s) in 6822.134244 ms
18/06/15 14:29:06 WARN FileStreamSource: Listed 4449 file(s) in 6478.381219 ms
18/06/15 14:30:08 WARN FileStreamSource: Listed 4450 file(s) in 8285.654031 ms
like image 602
ArunK Avatar asked Jun 15 '18 14:06

ArunK


People also ask

How do you stop structured streaming queries?

If by "gracefully" you mean that the streaming query should complete processing of data, then void stop() will not do that. It will just wait until the threads performing execution has stopped (as mentioned in the documentation).

Which property must a Spark structured streaming sink possess to ensure end to end exactly once semantics?

exactly once semantics are only possible if the source is re-playable and the sink is idempotent.

Can Spark streaming read from S3?

you need to provide the path to S3 bucket. And it will stream all data from all the files in this bucket. Then whenever w new file is created in this bucket, it will be streamed. If you are appending data to existing file which are read before, these new updates will not be read.

What is the difference between Spark streaming and structured streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

What is structured streaming in spark?

This tutorial module introduces Structured Streaming, the main model for handling streaming datasets in Apache Spark. In Structured Streaming, a data stream is treated as a table that is being continuously appended. This leads to a stream processing model that is very similar to a batch processing model.

How to stream data from pyspark to spark?

Below you can see an example of input data; First we will import required Pyspark libraries from Python and start a SparkSession. Remember that structured streaming proccesing always requires the specification of a schema for the data in the stream. We will load our data into a streaming DataFrame by using the “ readStream”.

How to use structured streaming in Databricks?

Structured Streaming tutorial 1 Load sample data. The easiest way to get started with Structured Streaming is to use an example Azure Databricks dataset available in the /databricks-datasets folder accessible within the Azure Databricks ... 2 Initialize the stream. ... 3 Start the streaming job. ... 4 Interactively query the stream. ...

How to create streaming DataFrames/datasets in spark?

Similar to static Datasets/DataFrames, you can use the common entry point SparkSession ( Scala / Java / Python / R docs) to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets.


2 Answers

The S3 API List operation can only be used to retrieve all object keys in a bucket sharing a prefix. So it's simply impossible to list only new, unprocessed objects. The Databricks folks seem to have a solution where you set up S3 to create an SQS record when a new object is created. Spark then checks SQS for new objects and retrieves specific objects from S3 (i.e. no listing involved). Unfortunately this connector seems to be available only on Databricks clusters and hasn't been open sourced, so if you're using for example EMR, you can't use it (unless of course you implement the connector yourself).

like image 154
lfk Avatar answered Nov 15 '22 16:11

lfk


A comment in the class FileStreamSource:

// Output a warning when listing files uses more than 2 seconds.

So, to get rid of this warning, you could reduce the amount of files processed every trigger:

maxFilesPerTrigger option can be set on the file source to ensure it takes < 2 seconds.

The first warning is the trigger interval you have set (60000) is shorter than the time taken (74364). Just increase the trigger interval to get rid of this.

like image 23
bp2010 Avatar answered Nov 15 '22 15:11

bp2010