I have a Databricks notebook setup that works as the following;
My problem is, that you can not name the file output file, where I need a static csv filename.
Is there way to rename this in pyspark?
## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""
## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"
## Connection string to connect to blob storage
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)
Followed by outputting file after data transformation
dfspark.coalesce(1).write.format('com.databricks.spark.csv') \
.mode('overwrite').option("header", "true").save(file_location_new)
Where the file is then write as "part-00000-tid-336943946930983.....csv"
Where as a the goal is to have "Output.csv"
Another approach I looked at was just recreating this in python but have not come across in the documentation yet of how to output the file back to blob storage.
I know the method to retrieve from Blob storage is .get_blob_to_path via microsoft.docs
Any help here is greatly appreciated.
Hadoop/Spark will parallel output the compute result per partition into one file, so you will see many part-<number>-.... files in a HDFS output path like Output/ named by you.
If you want to output all results of a computing into one file, you can merge them via the command hadoop fs -getmerge /output1/part* /output2/Output.csv or set the number of reduce processes with 1 like using coalesce(1) function.
So in your scenario, you only need to adjust the order of calling these functions to make the coalease function called at the front of save function, as below.
dfspark.write.format('com.databricks.spark.csv') \
.mode('overwrite').option("header", "true").coalesce(1).save(file_location_new)
The coalesce and repartition do not help with saving the dataframe into 1 normally named file.
I ended up just renaming the 1 csv file and deleting the folder with log:
def save_csv(df, location, filename):
outputPath = os.path.join(location, filename + '_temp.csv')
df.repartition(1).write.format("com.databricks.spark.csv").mode("overwrite").options(header="true", inferSchema="true").option("delimiter", "\t").save(outputPath)
csv_files = os.listdir(os.path.join('/dbfs', outputPath))
# moving the parquet-like temp csv file into normally named one
for file in csv_files:
if file[-4:] == '.csv':
dbutils.fs.mv(os.path.join(outputPath,file) , os.path.join(location, filename))
dbutils.fs.rm(outputPath, True)
# using save_csv
save_csv_location = 'mnt/.....'
save_csv(df, save_csv_location, 'name.csv')
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With