I have some data which has timestamp column field which is long and its epoch standard , I need to save that data in split-ted format like yyyy/mm/dd/hh using spark scala
data.write.partitionBy("timestamp").format("orc").save("mypath")
this is just splitting the data by timestamp like below
timestamp=1458444061098
timestamp=1458444061198
but I want it to be as
└── YYYY
└── MM
└── DD
└── HH
Apache Spark's Resilient Distributed Datasets (RDD) are a collection of various data that are so big in size, that they cannot fit into a single node and should be partitioned across various nodes. Apache Spark automatically partitions RDDs and distributes the partitions across different nodes.
By default, Spark/PySpark creates partitions that are equal to the number of CPU cores in the machine. Data of each partition resides in a single machine. Spark/PySpark creates a task for each partition. Spark Shuffle operations move the data from one partition to other partitions.
Joining two tables is one of the main transactions in Spark. It mostly requires shuffle which has a high cost due to data movement between nodes. If one of the tables is small enough, any shuffle operation may not be required. By broadcasting the small table to each node in the cluster, shuffle can be simply avoided.
Shuffling is a mechanism Spark uses to redistribute the data across different executors and even across machines. Spark shuffling triggers for transformation operations like gropByKey() , reducebyKey() , join() , groupBy() e.t.c. Spark Shuffle is an expensive operation since it involves the following. Disk I/O.
You can leverage various spark sql date/time functions for this. First, you add a new date type column created from the unix timestamp column.
val withDateCol = data
.withColumn("date_col", from_unixtime(col("timestamp"), "YYYYMMddHH"))
After this, you can add year, month, day and hour columns to the DF and then partition by these new columns for the write.
withDateCol
.withColumn("year", year(col("date_col")))
.withColumn("month", month(col("date_col")))
.withColumn("day", dayofmonth(col("date_col")))
.withColumn("hour", hour(col("date_col")))
.drop("date_col")
.partitionBy("year", "month", "day", "hour")
.format("orc")
.save("mypath")
The columns included in the partitionBy clause wont be part of the file schema.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With