Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Load S3 files in parallel Spark

I am successfully loading files into Spark, from S3, through the following code. It's working, however I am noticing that there is a delay between 1 file and another, and they are loaded sequentially. I would like to improve this by loading in parallel.

        // Load files that were loaded into firehose on this day
    var s3Files = spark.sqlContext.read.schema(schema).json("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").rdd

    // Apply the schema to the RDD, here we will have duplicates
    val usersDataFrame = spark.createDataFrame(s3Files, schema)

    usersDataFrame.createOrReplaceTempView("results")

    // Clean and use partition by the keys to eliminate duplicates and get latest record
    var results = spark.sql(buildCleaningQuery(job, "results"))
    results.createOrReplaceTempView("filteredResults")
    val records = spark.sql("select count(*) from filteredResults")

I have also tried loading through the textFile() method, however then I am having problems converting RDD[String] to RDD[Row] because afterwards I would need to move on to use Spark SQL. I am using it in the following manner;

        var s3Files = sparkContext.textFile("s3n://" + job.awsaccessKey + ":" + job.awssecretKey + "@" + job.bucketName + "/" + job.awss3RawFileExpression + "/" + year + "/" + monthCheck + "/" + dayCheck + "/*/").toJavaRDD()

What is the ideal manner to load JSON files (Multiple files around 50MB each) into Spark? I would like to validate the properties against a schema, so I would later on be able to Spark SQL queries to clean data.

like image 924
Mez Avatar asked Jan 24 '26 15:01

Mez


1 Answers

What's going on is that DataFrame is being converted into RDD and then into DataFrame again, which then loses the partitioning information.

var s3Files = spark
  .sqlContext
  .read.schema(schema)
  .json(...)
  .createOrRepla‌​ceTempView("results"‌​)

should be sufficient, and the partitioning information should still be present, allowing json files to be loaded concurrently.

like image 170
wllmtrng Avatar answered Jan 27 '26 07:01

wllmtrng



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!