I'm using PySpark to do classic ETL job (load dataset, process it, save it) and want to save my Dataframe as files/directory partitioned by a "virtual" column; what I mean by "virtual" is that I have a column Timestamp which is a string containing an ISO 8601 encoded date, and I'd want to partition by Year / Month / Day; but I don't actually have either a Year, Month or Day column in the DataFrame; I have this Timestamp from which I can derive these columns though, but I don't want my resultat items to have one of these columns serialized.
File structure resulting from saving the DataFrame to disk should look like:
/
year=2016/
month=01/
day=01/
part-****.gz
Is there a way to do what I want with Spark / Pyspark ?
Partition in memory: You can partition or repartition the DataFrame by calling repartition() or coalesce() transformations. Partition on disk: While writing the PySpark DataFrame back to disk, you can choose how to partition the data based on columns by using partitionBy() of pyspark.
Saves the content of the DataFrame as the specified table. In the case the table already exists, behavior of this function depends on the save mode, specified by the mode function (default to throwing an exception).
In PySpark, the withColumn() function is widely used and defined as the transformation function of the DataFrame which is further used to change the value, convert the datatype of an existing column, create the new column etc.
inferSchema -> Infer schema will automatically guess the data types for each field. If we set this option to TRUE, the API will read some sample records from the file to infer the schema. If we want to set this value to false, we must specify a schema explicitly.
Columns which are used for partitioning are not included in the serialized data itself. For example if you create DataFrame
like this:
df = sc.parallelize([
(1, "foo", 2.0, "2016-02-16"),
(2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])
and write it as follows:
import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year
outdir = tempfile.mktemp()
dt = col("date").cast("date")
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]
(df
.select(*exprs)
.write
.partitionBy(*(name for _, name in fname))
.format("json")
.save(outdir))
individual files won't contain partition columns:
import os
(sqlContext.read
.json(os.path.join(outdir, "year=2016/month=2/day=16/"))
.printSchema())
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
Partitioning data is stored only in a directory structure and not duplicated in serialized files. It will be attached only when your read complete or partial directory tree:
sqlContext.read.json(outdir).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- year: integer (nullable = true)
## |-- month: integer (nullable = true)
## |-- day: integer (nullable = true)
sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- day: integer (nullable = true)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With