I have the following file paths that we read with partitions on s3
prefix/company=abcd/service=xyz/date=2021-01-01/file_01.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_02.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_03.json
When I read these with pyspark
self.spark \
.read \
.option("basePath", 'prefix') \
.schema(self.schema) \
.json(['company=abcd/service=xyz/date=2021-01-01/'])
All the files have the same schema and get loaded in the table as rows. A file could be something like this:
{"id": "foo", "color": "blue", "date": "2021-12-12"}
The issue is that sometimes the files have the date field that clashes with my partition code, like date
. So I want to know if it is possible to load the files without the partition columns, rename the JSON date column and then add the partition columns.
Final table would be:
| id | color | file_date | company | service | date |
-------------------------------------------------------------
| foo | blue | 2021-12-12 | abcd | xyz | 2021-01-01 |
| bar | red | 2021-10-10 | abcd | xyz | 2021-01-01 |
| baz | green | 2021-08-08 | abcd | xyz | 2021-01-01 |
EDIT:
More information: I have 5 or 6 partitions sometimes and date is one of them (not the last). And I need to read multiple date partitions at once too. The schema that I pass to Spark contains also the partition columns which makes it more complicated.
I don't control the input data so I need to read as is. I can rename the file columns but not the partition columns.
Would it be possible to add an alias to file columns as we would do when joining 2 dataframes?
Spark 3.1
One way is to list the files under prefix
S3 path using for example Hadoop FS API, then pass that list to spark.read
. This way Spark won't detect them as partitions and you'll be able to rename the file columns if needed.
After you load the files into a dataframe, loop through the df columns and rename those which are also present in your partitions_colums
list (by adding file
prefix for example).
Finally, parse the partitions from the input_file_name()
using regexp_extract
function.
Here's an example:
from pyspark.sql import functions as F
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()
s3_path = "s3://bucket/prefix"
file_cols = ["id", "color", "date"]
partitions_cols = ["company", "service", "date"]
# listing all files for input path
json_files = []
files = Path(s3_path).getFileSystem(conf).listFiles(Path(s3_path), True)
while files.hasNext():
path = files.next().getPath()
if path.getName().endswith(".json"):
json_files.append(path.toString())
df = spark.read.json(json_files) # you can pass here the schema of the files without the partition columns
# renaming file column in if exists in partitions
df = df.select(*[
F.col(c).alias(c) if c not in partitions_cols else F.col(c).alias(f"file_{c}")
for c in df.columns
])
# parse partitions from filenames
for p in partitions_cols:
df = df.withColumn(p, F.regexp_extract(F.input_file_name(), f"/{p}=([^/]+)/", 1))
df.show()
#+-----+----------+---+-------+-------+----------+
#|color| file_date| id|company|service| date|
#+-----+----------+---+-------+-------+----------+
#|green|2021-08-08|baz| abcd| xyz|2021-01-01|
#| blue|2021-12-12|foo| abcd| xyz|2021-01-01|
#| red|2021-10-10|bar| abcd| xyz|2021-01-01|
#+-----+----------+---+-------+-------+----------+
Easiest would be to simply change the partition column name. You can then read in the data and rename the columns as you wish. You'll not lose the benefits of partitioning either.
If that is not an option you could read in the jsons using a wildcard for the partitions, rename the date column to 'file_date' and then add the partition date by extracting it from the filename. You can get the filename from input_file_name
in pyspark.sql.functions
.
Edit: I missed you have other partitioned columns before the date, you'd have to extract them from the filename as well making it less ideal.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With