Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Read spark data with column that clashes with partition name

I have the following file paths that we read with partitions on s3

prefix/company=abcd/service=xyz/date=2021-01-01/file_01.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_02.json
prefix/company=abcd/service=xyz/date=2021-01-01/file_03.json

When I read these with pyspark

self.spark \
    .read \
    .option("basePath", 'prefix') \
    .schema(self.schema) \
    .json(['company=abcd/service=xyz/date=2021-01-01/'])

All the files have the same schema and get loaded in the table as rows. A file could be something like this:

{"id": "foo", "color": "blue", "date": "2021-12-12"}

The issue is that sometimes the files have the date field that clashes with my partition code, like date. So I want to know if it is possible to load the files without the partition columns, rename the JSON date column and then add the partition columns.

Final table would be:

| id  | color | file_date  | company | service | date       |
-------------------------------------------------------------
| foo | blue  | 2021-12-12 | abcd    | xyz     | 2021-01-01 |
| bar | red   | 2021-10-10 | abcd    | xyz     | 2021-01-01 |
| baz | green | 2021-08-08 | abcd    | xyz     | 2021-01-01 |

EDIT:

More information: I have 5 or 6 partitions sometimes and date is one of them (not the last). And I need to read multiple date partitions at once too. The schema that I pass to Spark contains also the partition columns which makes it more complicated.

I don't control the input data so I need to read as is. I can rename the file columns but not the partition columns.

Would it be possible to add an alias to file columns as we would do when joining 2 dataframes?

Spark 3.1

like image 907
JBernardo Avatar asked Dec 13 '21 18:12

JBernardo


2 Answers

One way is to list the files under prefix S3 path using for example Hadoop FS API, then pass that list to spark.read. This way Spark won't detect them as partitions and you'll be able to rename the file columns if needed.

After you load the files into a dataframe, loop through the df columns and rename those which are also present in your partitions_colums list (by adding file prefix for example).

Finally, parse the partitions from the input_file_name() using regexp_extract function.

Here's an example:

from pyspark.sql import functions as F

Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
conf = sc._jsc.hadoopConfiguration()

s3_path = "s3://bucket/prefix"
file_cols = ["id", "color", "date"]
partitions_cols = ["company", "service", "date"]

# listing all files for input path
json_files = []
files = Path(s3_path).getFileSystem(conf).listFiles(Path(s3_path), True)

while files.hasNext():
    path = files.next().getPath()
    if path.getName().endswith(".json"):
        json_files.append(path.toString())

df = spark.read.json(json_files) # you can pass here the schema of the files without the partition columns

# renaming file column in if exists in partitions
df = df.select(*[
    F.col(c).alias(c) if c not in partitions_cols else F.col(c).alias(f"file_{c}")
    for c in df.columns
])

# parse partitions from filenames
for p in partitions_cols:
    df = df.withColumn(p, F.regexp_extract(F.input_file_name(), f"/{p}=([^/]+)/", 1))

df.show()

#+-----+----------+---+-------+-------+----------+
#|color| file_date| id|company|service|      date|
#+-----+----------+---+-------+-------+----------+
#|green|2021-08-08|baz|   abcd|    xyz|2021-01-01|
#| blue|2021-12-12|foo|   abcd|    xyz|2021-01-01|
#|  red|2021-10-10|bar|   abcd|    xyz|2021-01-01|
#+-----+----------+---+-------+-------+----------+
like image 168
blackbishop Avatar answered Oct 13 '22 20:10

blackbishop


Easiest would be to simply change the partition column name. You can then read in the data and rename the columns as you wish. You'll not lose the benefits of partitioning either.

If that is not an option you could read in the jsons using a wildcard for the partitions, rename the date column to 'file_date' and then add the partition date by extracting it from the filename. You can get the filename from input_file_name in pyspark.sql.functions.

Edit: I missed you have other partitioned columns before the date, you'd have to extract them from the filename as well making it less ideal.

like image 45
ScootCork Avatar answered Oct 13 '22 21:10

ScootCork