Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

spark partition data writing by timestamp

I have some data which has timestamp column field which is long and its epoch standard , I need to save that data in split-ted format like yyyy/mm/dd/hh using spark scala

data.write.partitionBy("timestamp").format("orc").save("mypath") 

this is just splitting the data by timestamp like below

timestamp=1458444061098
timestamp=1458444061198

but I want it to be as

└── YYYY
    └── MM
        └── DD
            └── HH
like image 459
kcoder Avatar asked Sep 27 '18 00:09

kcoder


People also ask

How does Spark partition the data?

Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.

How Spark decides number of partitions?

By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.

How do you stop shuffling in Spark?

Joining two tables is one of the main transactions in Spark. It mostly requires shuffle which has a high cost due to data movement between nodes. If one of the tables is small enough, any shuffle operation may not be required. By broadcasting the small table to each node in the cluster, shuffle can be simply avoided.

What is shuffling in Spark?

Shuffling is a mechanism Spark uses to redistribute the data across different executors and even across machines. Spark shuffling triggers for transformation operations like gropByKey() , reducebyKey() , join() , groupBy() e.t.c. Spark Shuffle is an expensive operation since it involves the following. Disk I/O.


1 Answers

You can leverage various spark sql date/time functions for this. First, you add a new date type column created from the unix timestamp column.

val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp"), "YYYYMMddHH"))

After this, you can add year, month, day and hour columns to the DF and then partition by these new columns for the write.

withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath") 

The columns included in the partitionBy clause wont be part of the file schema.

like image 76
Constantine Avatar answered Sep 20 '22 11:09

Constantine