In AWS Glue's catalog, I have an external table defined with partitions that looks roughly like this in S3 and partitions for new dates are added daily:
s3://my-data-lake/test-table/
2017/01/01/
part-0000-blah.csv.gz
.
.
part-8000-blah.csv.gz
2017/01/02/
part-0000-blah.csv.gz
.
.
part-7666-blah.csv.gz
How could I use Glue/Spark to convert this to parquet that is also partitioned by date and split across n files per day?. The examples don't cover partitioning or splitting or provisioning (how many nodes and how big). Each day contains a couple hundred GBs.
Because the source CSVs are not necessarily in the right partitions (wrong date) and are inconsistent in size, I'm hoping to to write to partitioned parquet with the right partition and more consistent size.
Since the source CSV files are not necessarily in the right date, you could add to them additional information regarding collect date time (or use any date if already available):
{"collectDateTime": {
"timestamp": 1518091828,
"timestampMs": 1518091828116,
"day": 8,
"month": 2,
"year": 2018
}}
Then your job could use this information in the output DynamicFrame and ultimately use them as partitions. Some sample code of how to achieve this:
from awsglue.transforms import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
import sys
import datetime
###
# CREATE THE NEW SIMPLIFIED LINE
##
def create_simplified_line(event_dict):
# collect date time
collect_date_time_dict = event_dict["collectDateTime"]
new_line = {
# TODO: COPY YOUR DATA HERE
"myData": event_dict["myData"],
"someOtherData": event_dict["someOtherData"],
"timestamp": collect_date_time_dict["timestamp"],
"timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
"year": collect_date_time_dict["year"],
"month": collect_date_time_dict["month"],
"day": collect_date_time_dict["day"]
}
return new_line
###
# MAIN FUNCTION
##
# context
glueContext = GlueContext(SparkContext.getOrCreate())
# fetch from previous day source bucket
previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
# build s3 paths
s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)
# create dynamic_frame
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")
# resolve choices (optional)
dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")
# transform the source dynamic frame into a simplified version
result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)
# write to simple storage service in parquet format
glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")
Did not test it, but the script is just a sample of how to achieve this and is fairly straightforward.
UPDATE
1) As for having specific file sizes/numbers in output partitions,
Spark's coalesce and repartition features are not yet implemented in Glue's Python API (only in Scala).
You can convert your dynamic frame into a data frame and leverage Spark's partition capabilities.
Convert to a dataframe and partition based on "partition_col"
partitioned_dataframe = datasource0.toDF().repartition(1)
Convert back to a DynamicFrame for further processing.
partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")
The good news is that Glue has an interesting feature that if you have more than 50,000 input files per partition it'll automatically group them to you.
In case you want to specifically set this behavior regardless of input files number (your case), you may set the following connection_options
while "creating a dynamic frame from options":
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")
In the previous example, it would attempt to group files into 1MB groups.
It is worth mentioning that this is not the same as coalesce, but it may help if your goal is to reduce the number of files per partition.
2) If files already exist in the destination, will it just safely add it (not overwrite or delete)
Glue's default SaveMode for write_dynamic_frame.from_options
is to append.
When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs
I'm afraid I won't be able to answer that. It depends on how fast it'll load your input files (size/number), your script's transformations, etc.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With