Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Is reading a CSV file from S3 into a Spark dataframe expected to be so slow?

I am building an application that needs to load data sets from S3. The functionality is working correctly, but the performance is surprisingly slow.

The datasets are in CSV format. There are approximately 7M records (lines) in each file, and each file is 600-700MB.

val spark = SparkSession
       .builder()
       .appName("MyApp")
       .getOrCreate()

val df = spark
     .read
    .option("header", "true") 
    .option("inferSchema", "true") 
    .csv(inFileName:_*)
    // inFileName is a list that current contains 2 file names
    // eg.  s3://mybucket/myfile1.csv

val r = df.rdd.flatMap{ row =>
      /*
       * Discard poorly formated input records 
       */
      try {
        totalRecords.add(1)

        // this extracts several columns from the dataset
        // each tuple of indexColProc specifies the index of the column to
        // select from the input row, and a function to convert
        // the value to an Int
        val coords = indexColProc.map{ case (idx, func) => func( row.get(idx).toString ) }

        List( (coords(0), coords) )
      }
      catch {
        case e: Exception => {    
          badRecords.add(1)
          List()
        }
      }
    }

println("Done, row count " + r.count )

I ran this on an AWS cluster of 5 machines, each an m3.xlarge. The maximizeResourceAllocation parameter was set to true, and this was the only application running on the cluster.

I ran the application in twice. The first time with 'inFileName' pointing at the files on S3, and the second time pointing at a local copy of the files in hadoop file system.

When I look at the Spark history server and drill down to the job that corresponds to the final r.count action, I see that it takes 2.5 minutes when accessing the files on s3, and 18s when accessing the files locally on hdfs. I"ve gotten proportionally similar results when I run the same experiment on a smaller cluster or in master=local configuration.

When I copy the s3 files to the cluster using

aws s3 cp <file>

It only takes 6.5s to move one 600-700MB file. So it doesn't seem the raw I/O of the machine instance is contributing that much to the slow down.

Is this kind of slow performance when accessing s3 expected? If not, could someone please point out where I'm going wrong. If it is expected, are other ways to do this that would have better performance? Or do I need to develop something to simply copy the files over from s3 to hdfs before the application runs?

like image 756
Tim Ryan Avatar asked Sep 29 '16 00:09

Tim Ryan


People also ask

Can Spark read from S3?

Spark SQL provides spark. read. csv("path") to read a CSV file from Amazon S3, local file system, hdfs, and many other data sources into Spark DataFrame and dataframe.

Does Spark support CSV?

Spark SQL provides spark. read(). csv("file_name") to read a file or directory of files in CSV format into Spark DataFrame, and dataframe.


1 Answers

We faced the exact same issue about a couple of months ago, except that our data was 1TB so the issue was more pronounced.

We dug into it and finally came to the following conclusion: Since we had 5 instances with 30 executors each, every time a stage was scheduled (and the first thing the task would do is fetch data from S3), so these tasks will be bottle-necked on network bandwidht, then they all move to compute part of the task and may contend for CPU simultaneously.

So basically because the tasks are all doing the same thing at the same time, they are always contending for the same resources.

We figured out that allowing only k number of tasks at any point would allow them to finish download quickly and move to the compute part and next set of k tasks can then come in and start downloading. This way, now k (as opposed to all) tasks are getting full bandwidth and some tasks are simultaneously doing something useful on CPU or I/O without waiting for each other on some common resource.

Hope this helps.

like image 130
Sachin Tyagi Avatar answered Oct 13 '22 01:10

Sachin Tyagi