Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Spark Streaming on a S3 Directory

So I have thousands of events being streamed through Amazon Kinesis into SQS then dumped into a S3 directory. About every 10 minutes, a new text file is created to dump the data from Kinesis into S3. I would like to set up Spark Streaming so that it streams the new files being dumped into S3. Right now I have

import org.apache.spark.streaming._
val currentFileStream = ssc.textFileStream("s3://bucket/directory/event_name=accepted/")
currentFileStream.print
ssc.start()

However, Spark Streaming is not picking up the new files being dumped into S3. I think it has something to do with the file write requirements:

The files must have the same data format.
The files must be created in the dataDirectory by atomically moving or renaming them into the data directory.
Once moved, the files must not be changed. So if the files are being continuously appended, the new data will not be read.

Why is Spark streaming not picking up the new files? Is it because AWS is creating the files in the directory and not moving them? How can I make sure Spark picks up the files being dumped into S3?

like image 277
Brandon Avatar asked Jun 23 '15 05:06

Brandon


People also ask

Can Spark streaming read from S3?

After the Spark Streaming application processes the data, it stores the data in an Amazon S3 bucket. The Real-Time Analytics solution requires a working Spark Streaming application written in Java, Scala, or Python. We recommend that you use the latest version of Apache Spark for your application.

Can we stream data to S3?

You can set up the Kinesis Stream to S3 to start streaming your data to Amazon S3 buckets using the following steps: Step 1: Signing in to the AWS Console for Amazon Kinesis. Step 2: Configuring the Delivery Stream. Step 3: Transforming Records using a Lambda Function.

Where does Spark streaming store the data?

Spark Streaming Sources Every input DStream (except file stream) associate with a Receiver object which receives the data from a source and stores it in Spark's memory for processing.

Can Spark write to S3?

Spark doesn't have a native S3 implementation and relies on Hadoop classes to abstract the data access. Hadoop provides 3 filesystem clients for S3 (s3n, s3a, and block s3). Getting Spark to work with S3 through these connectors requires a lot of fine tuning to get more performance predictability from the Spark jobs.


1 Answers

In order to stream an S3 bucket. you need to provide the path to S3 bucket. And it will stream all data from all the files in this bucket. Then whenever w new file is created in this bucket, it will be streamed. If you are appending data to existing file which are read before, these new updates will not be read.

here is small piece of code that works

import org.apache.spark.streaming._

val conf = new SparkConf().setAppName("Simple Application").setMaster("local[*]")      
val sc = new SparkContext(conf)
val hadoopConf=sc.hadoopConfiguration;
hadoopConf.set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3.awsSecretAccessKey",mySecretKey)

//ones above this may be deprecated?
hadoopConf.set("fs.s3n.awsAccessKeyId",myAccessKey)
hadoopConf.set("fs.s3n.awsSecretAccessKey",mySecretKey)

val ssc = new org.apache.spark.streaming.StreamingContext(
  sc,Seconds(60))
val lines = ssc.textFileStream("s3n://path to bucket")
lines.print()

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

hope it will help.

like image 117
Hafiz Mujadid Avatar answered Sep 19 '22 19:09

Hafiz Mujadid