Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to prevent processing files twice with Spark DataFrames

I'm processing some S3 TSV to S3 Parquet using AWS Glue. Due to non-UTF-8 incoming files I am forced to use DataFrames instead of DynamicFrames to process my data (it's a known issue with no workaounds that DynamicFrames fail completely with any non-UTF8 characters). This also seems to mean that I cannot use Job Bookmarks in Glue to keep track of which S3 TSV files I have already processed.

My code looks like this:

# pylint: skip-file
# flake8: noqa
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import split
from awsglue.dynamicframe import DynamicFrame

# @params: [JOB_NAME, s3target]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 's3target', 's3source'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Define massive list of fields in the schema
fields = [
    StructField("accept_language", StringType(), True),
    StructField("browser", LongType(), True),
    .... huge list ...
    StructField("yearly_visitor", ShortType(), True),
    StructField("zip", StringType(), True)
]

schema = StructType(fields)

# Read in data using Spark DataFrame and not an AWS Glue DynamicFrame to avoid issues with non-UTF8 characters
df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(args['s3source'] + "/*.tsv.gz")

# Remove all rows that are entirely nulls
df1 = df0.dropna(how = 'all')

# Generate a partitioning column
df2 = df1.withColumn('date', df1.date_time.cast('date'))

# Write out in parquet format, partitioned by date, to the S3 location specified in the arguments
ds2 = df2.write.format("parquet").partitionBy("date").mode("append").save(args['s3target'])

job.commit()

My question is - without job bookmarks each time this runs it processes the same s3 files over and over again. How can I move processed files in the source s3 bucket to a subfolder or something, or otherwise avoid double processing the files?

I'm not sure what the trick is here, with Spark being a parallel system, and not really even knowing what the files are. I guess I could create a second Glue job with the Python Shell job type and delete the incoming files immediately after, but even then I'm not sure which files to delete, etc.

Thanks,

Chris

like image 459
Chris KL Avatar asked Oct 22 '25 11:10

Chris KL


2 Answers

To mark the processed files out of the input source prefix, you'll have to use boto3 (or awscli directly) to move the files or delete them.

To identify which files to process, you can proceed in 2 different ways:

  • resolve your file glob args['s3source'] + "/*.tsv.gz" using boto3 with s3client.list_objects() before using spark. You can provide an array of resolved files instead of a glob to spark.read.load.
import boto3
client = boto3.client('s3')

# get all the available files
# Note: if you expect a lot of files, you need to iterate on the pages of results

response = client.list_objects_v2(Bucket=your_bucket_name,Prefix=your_path_prefix)
files=['s3://'+your_bucket_name+obj['Key'] for obj in response['Contents'] if obj.endswith('tsv.gz')]

 ... initialize your job as before ...

df0 = df0 = spark.read.format("com.databricks.spark.csv").option("quote", "\"").option("delimiter", u'\u0009').option("charset", 'utf-8').schema(schema).load(files)

 ... do your work as before ...
  • use the fact that spark keeps track of all its input files to post-process them after a successful save:
 ... process your files with pyspark as before...

# retrieve the tracked files from the initial DataFrame
# you need to access the java RDD instances to get to the partitions information
# The file URIs will be in the following format: u's3://mybucket/mypath/myfile.tsv.gz'

files = [] 
for p in df0.rdd._jrdd.partitions(): 
    files.append([f.filePath() for f in p.files().array()])

Once you have the file list, deleting, renaming or adding them to a metadata store to filter them out on the next job is pretty straightforward.

For example, to delete them:

# initialize a S3 client if not already done
from urlparse import urlparse # python 2
import boto3
client = boto3.client('s3')

# do what you want with the uris, for example delete them
for uri in files:
   parsed = urlparse(uri)
   client.delete_object(Bucket=parsed.netloc, Key=parsed.path)
like image 157
rluta Avatar answered Oct 24 '25 02:10

rluta


If you're not concerned about processing the same source files again (vis a vis the time constraints) and your use case is to not have duplicated data in the destination, you may consider updating the save mode to "Overwrite" when writing the dataframe

https://spark.apache.org/docs/2.1.1/api/java/org/apache/spark/sql/DataFrameWriter.html

like image 39
Ujjwal Bhardwaj Avatar answered Oct 24 '25 02:10

Ujjwal Bhardwaj



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!