I'm trying to get the input file name (or path) for every file loaded through an S3 data catalog in AWS Glue.
I've read in a few places that input_file_name()
should provide this information (though caveated that this only works when calling from_catalog
and not from_options
, which I believe I am!).
So the code below seems like it should work, but always returns empty values for every input_file_name
.
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session
job = Job(gc)
job.init(args['JOB_NAME'], args)
# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
database='database_name',
table_name='table_name',
transformation_ctx='fm_source',
)
df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)
Resulting output:
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
Is there another way that I should be creating the frame that ensures input_file_name()
is populated? I've now tried to build a source frame through create_dynamic_frame.from_catalog
, create_dynamic_frame.from_options
and getSource().getFrame()
, but I get the same result of an empty input_file_name
column for each.
I also add my experience, in my case I received an empty result because calling the cache()
method.
For example:
import pyspark.sql.functions as F
df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())
df.show()
I receive
+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
| 13| |
| 33| |
| 53| |
| 73| |
| 93| |
+-------------+---------------+
But if I remove the line df.cache()
the column input_file_name
shows correctly the input filenames.
A workaround might be calling F.input_file_name()
before caching.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With