I am trying to test how to write data in HDFS 2.7 using Spark 2.1. My data is a simple sequence of dummy values and the output should be partitioned by the attributes: id and key.
// Simple case class to cast the data
case class SimpleTest(id:String, value1:Int, value2:Float, key:Int)
// Actual data to be stored
val testData = Seq(
SimpleTest("test", 12, 13.5.toFloat, 1),
SimpleTest("test", 12, 13.5.toFloat, 2),
SimpleTest("test", 12, 13.5.toFloat, 3),
SimpleTest("simple", 12, 13.5.toFloat, 1),
SimpleTest("simple", 12, 13.5.toFloat, 2),
SimpleTest("simple", 12, 13.5.toFloat, 3)
)
// Spark's workflow to distribute, partition and store
// sc and sql are the SparkContext and SparkSession, respectively
val testDataP = sc.parallelize(testData, 6)
val testDf = sql.createDataFrame(testDataP).toDF("id", "value1", "value2", "key")
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
I am expecting to get the following tree structure in HDFS:
- /path/to/file
|- /id=test/key=1/part-01.parquet
|- /id=test/key=2/part-02.parquet
|- /id=test/key=3/part-03.parquet
|- /id=simple/key=1/part-04.parquet
|- /id=simple/key=2/part-05.parquet
|- /id=simple/key=3/part-06.parquet
But when I run the previous code I get the following output:
/path/to/file/id=/key=24/
|-/part-01.parquet
|-/part-02.parquet
|-/part-03.parquet
|-/part-04.parquet
|-/part-05.parquet
|-/part-06.parquet
I do not know if there is something wrong in the code, or is there something else that Spark is doing.
I'm executing spark-submit
as follows:
spark-submit --name APP --master local --driver-memory 30G --executor-memory 30G --executor-cores 8 --num-executors 8 --conf spark.io.compression.codec=lzf --conf spark.akka.frameSize=1024 --conf spark.driver.maxResultSize=1g --conf spark.sql.orc.compression.codec=uncompressed --conf spark.sql.parquet.filterPushdown=true --class myClass myFatJar.jar
An ORC or Parquet file contains data columns. To these files you can add partition columns at write time. The data files do not store values for partition columns; instead, when writing the files you divide them into groups (partitions) based on column values.
Create Parquet partition file In PySpark, we can improve query execution in an optimized way by doing partitions on the data using pyspark partitionBy() method. Following is the example of partitionBy(). When you check the people2. parquet file, it has two partitions “gender” followed by “salary” inside.
A partition in spark is an atomic chunk of data (logical division of data) stored on a node in the cluster.
Interesting since...well..."it works for me".
As you describe your dataset using SimpleTest
case class in Spark 2.1 you're import spark.implicits._
away to have a typed Dataset
.
In my case, spark
is sql
.
In other words, you don't have to create testDataP
and testDf
(using sql.createDataFrame
).
import spark.implicits._
...
val testDf = testData.toDS
testDf.write.partitionBy("id", "key").parquet("/path/to/file")
In another terminal (after saving to /tmp/testDf
directory):
$ tree /tmp/testDf/
/tmp/testDf/
├── _SUCCESS
├── id=simple
│ ├── key=1
│ │ └── part-00003-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ ├── key=2
│ │ └── part-00004-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
│ └── key=3
│ └── part-00005-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── id=test
├── key=1
│ └── part-00000-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
├── key=2
│ └── part-00001-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
└── key=3
└── part-00002-35212fd3-44cf-4091-9968-d9e2e05e5ac6.c000.snappy.parquet
8 directories, 7 files
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With