Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Why is input_file_name() empty for S3 catalog sources in pyspark?

I'm trying to get the input file name (or path) for every file loaded through an S3 data catalog in AWS Glue.

I've read in a few places that input_file_name() should provide this information (though caveated that this only works when calling from_catalog and not from_options, which I believe I am!).

So the code below seems like it should work, but always returns empty values for every input_file_name.

import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.functions import input_file_name


args = getResolvedOptions(sys.argv, ['JOB_NAME', 'TempDir'])
sc = SparkContext()
gc = GlueContext(sc)
spark = gc.spark_session


job = Job(gc)
job.init(args['JOB_NAME'], args)


# Get the source frame from the Glue Catalog, which describes files in S3
fm_source = gc.create_dynamic_frame.from_catalog(
    database='database_name',
    table_name='table_name',
    transformation_ctx='fm_source',
)

df_source = fm_source.toDF().withColumn('input_file_name', input_file_name())
df_source.show(5)

Resulting output:

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

Is there another way that I should be creating the frame that ensures input_file_name() is populated? I've now tried to build a source frame through create_dynamic_frame.from_catalog, create_dynamic_frame.from_options and getSource().getFrame(), but I get the same result of an empty input_file_name column for each.

like image 538
Will Croft Avatar asked Jun 28 '19 16:06

Will Croft


1 Answers

I also add my experience, in my case I received an empty result because calling the cache() method.

For example:

import pyspark.sql.functions as F

df = spark.read.json("/my/folder/test.json")
df.cache()
df = df.withColumn("input_file_name", F.input_file_name())

df.show()

I receive

+-------------+---------------+
|other_columns|input_file_name|
+-------------+---------------+
|           13|               |
|           33|               |
|           53|               |
|           73|               |
|           93|               |
+-------------+---------------+

But if I remove the line df.cache() the column input_file_name shows correctly the input filenames.

A workaround might be calling F.input_file_name() before caching.

like image 114
Vzzarr Avatar answered Oct 11 '22 13:10

Vzzarr