I'm pretty new to Spark (2 days) and I'm pondering the best way to partition parquet files.
My rough plan ATM is:
It's been ludicrously easy (kudos to Spark devs) to get a simple version working - except for partitioning the way I'd like to. This is in python BTW:
input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")
Is the best approach to map the RDD, adding new columns for year, month, day, hour and then use PartitionBy
? Then for any queries we have to manually add year/month etc? Given how elegant I've found spark to be so far, this seems a little odd.
Thanks
Understanding Spark Partitioning 1 By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. 2 Data of each partition resides in a single machine. 3 Spark/PySpark creates a task for each partition. 4 Spark Shuffle operations move the data from one partition to other partitions. More items...
Parquet also supports partitioning of data based on the values of one or more columns. This article looks at the effects of partitioning on query performance. We use Spark on Yarn, but the conclusions at the end hold true for other HDFS querying tools like Hive and Drill.
Apache Spark supports two types of partitioning “hash partitioning” and “range partitioning”. Depending on how keys in your data are distributed or sequenced as well as the action you want to perform on your data can help you select the appropriate techniques. There are many factors which affect partitioning choices like:
We can see that the parquet format needed about 62% less disk space the smaller dataset, and 37% for the larger dataset (the larger dataset has a UUID column which can't be as effectively encoded and compressed as a column with possible repetitions). We can partition the data into any one or more of the columns.
I've found a few ways to do this now, not yet run performance tests over them, caveat emptor:
First we need to create a derived DataFrame (three ways shown below) and then write it out.
1) sql queries (inline functions)
sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
input.registerTempTable("input")
input_ts = sqlContext.sql(
"select day(inserted_at) AS inserted_at_day, * from input")
2) sql queries (non-inline) - very similar
def day(ts):
return f.day
sqlContext.registerFunction("day",day, IntegerType())
... rest as before
3) withColumn
from pyspark.sql.functions import udf
day = udf(lambda f: f.day, IntegerType())
input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))
To write out just:
input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With