Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What are the best practices to partition Parquet files by timestamp in Spark?

I'm pretty new to Spark (2 days) and I'm pondering the best way to partition parquet files.

My rough plan ATM is:

  • read in the source TSV files with com.databricks.spark.csv (these have a TimeStampType column)
  • write out parquet files, partitioned by year/month/day/hour
  • use these parquet files for all the queries that'll then be occurring in future

It's been ludicrously easy (kudos to Spark devs) to get a simple version working - except for partitioning the way I'd like to. This is in python BTW:

input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")

Is the best approach to map the RDD, adding new columns for year, month, day, hour and then use PartitionBy? Then for any queries we have to manually add year/month etc? Given how elegant I've found spark to be so far, this seems a little odd.

Thanks

like image 454
Adrian Bridgett Avatar asked Jul 17 '15 10:07

Adrian Bridgett


People also ask

How to understand spark partitioning?

Understanding Spark Partitioning 1 By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. 2 Data of each partition resides in a single machine. 3 Spark/PySpark creates a task for each partition. 4 Spark Shuffle operations move the data from one partition to other partitions. More items...

Does partitioning affect query performance in parquet?

Parquet also supports partitioning of data based on the values of one or more columns. This article looks at the effects of partitioning on query performance. We use Spark on Yarn, but the conclusions at the end hold true for other HDFS querying tools like Hive and Drill.

What types of partitioning does Apache Spark support?

Apache Spark supports two types of partitioning “hash partitioning” and “range partitioning”. Depending on how keys in your data are distributed or sequenced as well as the action you want to perform on your data can help you select the appropriate techniques. There are many factors which affect partitioning choices like:

How much less disk space does Parquet format take?

We can see that the parquet format needed about 62% less disk space the smaller dataset, and 37% for the larger dataset (the larger dataset has a UUID column which can't be as effectively encoded and compressed as a column with possible repetitions). We can partition the data into any one or more of the columns.


1 Answers

I've found a few ways to do this now, not yet run performance tests over them, caveat emptor:

First we need to create a derived DataFrame (three ways shown below) and then write it out.

1) sql queries (inline functions)

sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
input.registerTempTable("input")
input_ts = sqlContext.sql(
  "select day(inserted_at) AS inserted_at_day, * from input")

2) sql queries (non-inline) - very similar

def day(ts):
  return f.day
sqlContext.registerFunction("day",day, IntegerType())
... rest as before

3) withColumn

from pyspark.sql.functions import udf
day = udf(lambda f: f.day, IntegerType())
input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))

To write out just:

input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")
like image 85
Adrian Bridgett Avatar answered Oct 04 '22 18:10

Adrian Bridgett