Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Apache Spark: Using folder structures to reduce run-time of analyses

I want to optimize the run-time of a Spark application by subdividing a huge csv file into different partitions, dependent of their characteristics.

E.g. I have a column with customer ids (integer, a), a column with dates (month+year, e.g. 01.2015, b), and a column with product ids (integer, c) (and more columns with product specific data, not needed for the partitioning).

I want to build a folder structure like /customer/a/date/b/product/c. When a user wants to know information about products from customer X, sold in January 2016, he could load and analyse the file saved in /customer/X/date/01.2016/*.

Is there a possibility to load such folder structures via wildcards? It should also be possible to load all customer or products of an specific time range, e.g. 01.2015 till 09.2015. Is it possible to use wildcards like /customer/*/date/*.2015/product/c? Or how could a problem like this be solved?

I want to partition the data once, and later load the specific files in the analysis, to reduce the run-time for these jobs (disregarded the additional work for the partitioning).

SOLUTION: Working with Parquet files

I changed my Spark Application to save my data to Parquet files, now everything works fine and I can pre-select the data by giving folder-structure. Here my code snippet:

JavaRDD<Article> goodRdd = ...

SQLContext sqlContext = new SQLContext(sc);

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));

StructType schema = DataTypes.createStructType(fields);

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
    public Row call(Article article) throws Exception {
        return RowFactory.create(article.getKeyStore(), article.getTextArticle());
    }
});

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);

// WRITE PARQUET FILES
 storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");

// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");

System.out.println("READ : " + read.count());

IMPORTANT

Don't try out with a table with only one column! You will get Exceptions when you try to call the partitionBy method!

like image 287
D. Müller Avatar asked Jun 14 '16 08:06

D. Müller


People also ask

What is difference between bucketing and partitioning in Spark?

Bucketing is similar to partitioning, but partitioning creates a directory for each partition, whereas bucketing distributes data across a fixed number of buckets by a hash on the bucket value. Tables can be bucketed on more than one value and bucketing can be used with or without partitioning.

When should I use partition in Spark?

Spark/PySpark partitioning is a way to split the data into multiple partitions so that you can execute transformations on multiple partitions in parallel which allows completing the job faster. You can also write partitioned data into a file system (multiple sub-directories) for faster reads by downstream systems.

How does Spark core optimize its workflow?

SPARK uses multiple executors and cores: Each of these tasks are handling subset of the data and can run in parallel. So, when we have multiple cores (not more than 5) within each executor, then basically spark uses those extra cores to spawn extra threads and these threads will perform the tasks concurrently.


1 Answers

So, in Spark you can save and read partitioned data much in the way you are looking for. However, rather than creating the path like you have /customer/a/date/b/product/c Spark will use this convention /customer=a/date=b/product=c when you save data using:

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/")

When you need to read in the data, you need to specify the basepath-option like this:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/")

and everything following /my/base/path/ will be interpreted as columns by Spark. In the example given here, Spark would add the three columns customer, date and product to the dataframe. Note that you can use wildcards for any of the columns as you like.

As for reading in data in a specific time range, you should be aware that Spark uses predicate push down, so it will only actually load data into memory that fits the criteria (as specified by some filter-transformation). But if you really want to specify range explicitly, you could generate a list of path names and then pass that to the read function. Like this:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
                          "/my/path/customer=*/date=02.2015/product=*", 
                          "/my/path/customer=*/date=03.2015/product=*"...,
                          "/my/path/customer=*/date=09.2015/product=*")

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*)

Anyway, I hope this helps :)

like image 61
Glennie Helles Sindholt Avatar answered Nov 01 '22 16:11

Glennie Helles Sindholt