I am using AWS to transform some JSON files. I have added the files to Glue from S3. The job I have set up reads the files in ok, the job runs successfully, there is a file added to the correct S3 bucket. The issue I have is that I cant name the file - it is given a random name, it is also not given the .JSON extension.
How can I name the file and also add the extension to the output?
Due to the nature of how Spark works, it's not possible to name the file. However, it's possible to rename the file right afterward.
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("s3://{bucket_name}"), sc._jsc.hadoopConfiguration())
file_path = "s3://{bucket_name}/processed/source={source_name}/year={partition_year}/week={partition_week}/"
df.coalesce(1).write.format("json").mode(
"overwrite").option("codec", "gzip").save(file_path)
# rename created file
created_file_path = fs.globStatus(Path(file_path + "part*.gz"))[0].getPath()
fs.rename(
created_file_path,
Path(file_path + "{desired_name}.jl.gz"))
This following code worked for me -
source_DataFrame = glueContext.create_dynamic_frame.from_catalog(database = databasename, table_name = source_tablename_in_catalog, transformation_ctx = "source_DataFrame")
source_DataFrame = source_DataFrame.toDF().coalesce(1) #avoiding coalesce(1) will create many part-000* files according to data
from awsglue.dynamicframe import DynamicFrame
DyF = DynamicFrame.fromDF(source_DataFrame, glueContext, "DyF")
# writing the file as usual in Glue. **I have given some partitions** too.
# keep "partitionKeys":[] in case of no partitions
output_Parquet = glueContext.write_dynamic_frame.from_options(frame = DyF, connection_type = "s3", format = "parquet", connection_options = {"path": destination_path + "/", "partitionKeys": ["department","team","card","datepartition"]}, transformation_ctx = "output_Parquet")
import boto3
client = boto3.client('s3')
#getting all the content/file inside the bucket.
response = client.list_objects_v2(Bucket=bucket_name)
names = response["Contents"]
#Find out the file which have part-000* in it's Key
particulars = [name['Key'] for name in names if 'part-000' in name['Key']]
#Find out the prefix of part-000* because we want to retain the partitions schema
location = [particular.split('part-000')[0] for particular in particulars]
#Constrain - copy_object has limit of 5GB.datepartition=20190131
for key,particular in enumerate(particulars):
client.copy_object(Bucket=bucket_name, CopySource=bucket_name + "/" + particular, Key=location[key]+"newfile")
client.delete_object(Bucket=bucket_name, Key=particular)
job.commit()
Cornerstone is it will fail in copying the file (copy_object) when it is higher than 5GB. You can use this
s3 = boto3.resource('s3')
for key,particular in enumerate(particulars):
copy_source = {
'Bucket': bucket_name,
'Key': particular
}
s3.meta.client.copy(copy_source, bucket_name, location[key]+"newfile")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With