Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to continuously monitor a directory by using Spark Structured Streaming

I want spark to continuously monitor a directory and read the CSV files by using spark.readStream as soon as the file appears in that directory.

Please don't include a solution of Spark Streaming. I am looking for a way to do it by using spark structured streaming.

like image 795
Naman Agarwal Avatar asked Sep 13 '17 12:09

Naman Agarwal


People also ask

Does Spark Streaming run continuously?

Spark Streaming provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.

What is the difference between Spark Streaming and structured Streaming?

Spark receives real-time data and divides it into smaller batches for the execution engine. In contrast, Structured Streaming is built on the SparkSQL API for data stream processing. In the end, all the APIs are optimized using Spark catalyst optimizer and translated into RDDs for execution under the hood.

How does Spark structured Streaming work?

Structured Streaming is a high-level API for stream processing that became production-ready in Spark 2.2. Structured Streaming allows you to take the same operations that you perform in batch mode using Spark's structured APIs, and run them in a streaming fashion.

Can Spark structured Streaming API can be used to process graph data?

Spark Components It implements the higher-level Dataset and DataFrame APIs of Spark and adds SQL support on top of it. The libraries built on top of these are: MLLib for machine learning, GraphFrames for graph analysis, and 2 APIs for stream processing: Spark Streaming and Structured Streaming.


2 Answers

Here is the complete Solution for this use Case:

If you are running in stand alone mode. You can increase the driver memory as:

bin/spark-shell --driver-memory 4G

No need to set the executor memory as in Stand Alone mode executor runs within the Driver.

As Completing the solution of @T.Gaweda, find the solution below:

val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

csvDf.writeStream.format("console").option("truncate","false").start()

now the spark will continuously monitor the specified directory and as soon as you add any csv file in the directory your DataFrame operation "csvDF" will be executed on that file.

Note: If you want spark to inferschema you have to first set the following configuration:

spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")

where spark is your spark session.

like image 58
Naman Agarwal Avatar answered Sep 25 '22 21:09

Naman Agarwal


As written in official documentation you should use "file" source:

File source - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.

Code example taken from documentation:

// Read all the csv files written atomically in a directory
val userSchema = new StructType().add("name", "string").add("age", "integer")
val csvDF = spark
  .readStream
  .option("sep", ";")
  .schema(userSchema)      // Specify schema of the csv files
  .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")

If you don't specify trigger, Spark will read new files as soon as possible

like image 43
T. Gawęda Avatar answered Sep 24 '22 21:09

T. Gawęda