How do I verify my bookmarks are working? I find that when I run a job immediately after the previous finishes, it seem to still take a long time. Why is that? I thought it will not read the files it already processed? The script looks like below:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
inputGDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://xxx-glue/testing-csv"], "recurse": True}, format = "csv", format_options = {"withHeader": True}, transformation_ctx="inputGDF")
if bool(inputGDF.toDF().head(1)):
print("Writing ...")
inputGDF.toDF() \
.drop("createdat") \
.drop("updatedat") \
.write \
.mode("append") \
.partitionBy(["querydestinationplace", "querydatetime"]) \
.parquet("s3://xxx-glue/testing-parquet")
else:
print("Nothing to write ...")
job.commit()
import boto3
glue_client = boto3.client('glue', region_name='ap-southeast-1')
glue_client.start_crawler(Name='xxx-testing-partitioned')
The log looks like:
18/12/11 14:49:03 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:03 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:04 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:04 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
18/12/11 14:49:05 INFO Client: Application report for application_1544537674695_0001 (state: RUNNING)
18/12/11 14:49:05 DEBUG Client:
client token: N/A
diagnostics: N/A
ApplicationMaster host: 172.31.2.72
ApplicationMaster RPC port: 0
queue: default
start time: 1544539297014
final status: UNDEFINED
tracking URL: http://ip-172-31-0-204.ap-southeast-1.compute.internal:20888/proxy/application_1544537674695_0001/
user: root
...
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv:0+1194081
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-14_2018-11-18.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-15_2018-11-19.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 89.0 in stage 0.0 (TID 89). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 92
18/12/11 14:42:00 INFO Executor: Running task 92.0 in stage 0.0 (TID 92)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv:0+1137753
18/12/11 14:42:00 INFO Executor: Finished task 88.0 in stage 0.0 (TID 88). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 93
18/12/11 14:42:00 INFO Executor: Running task 93.0 in stage 0.0 (TID 93)
18/12/11 14:42:00 INFO NewHadoopRDD: Input split: s3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv:0+1346626
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-16_2018-11-20.csv' for reading
18/12/11 14:42:00 INFO S3NativeFileSystem: Opening 's3://pinfare-glue/testing-csv/2018-09-25/DPS/2018-11-17_2018-11-21.csv' for reading
18/12/11 14:42:00 INFO Executor: Finished task 90.0 in stage 0.0 (TID 90). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO Executor: Finished task 91.0 in stage 0.0 (TID 91). 2088 bytes result sent to driver
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 94
18/12/11 14:42:00 INFO CoarseGrainedExecutorBackend: Got assigned task 95
18/12/11 14:42:00 INFO Executor: Running task 95.0 in stage 0.0 (TID 95)
18/12/11 14:42:00 INFO Executor: Running task 94.0 in stage 0.0 (TID 94)
... I notice the parquet is appended with alot of duplicate data ... Is the bookmark not working? Its already enabled
From the docs
Job must be created with --job-bookmark-option
job-bookmark-enable
(or if using the console then in the console options). Job must also have a jobname; this will be passed in automatically.
Job must start with a job.init(jobname)
e.g.
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
Job must have a job.commit()
to save the state of the bookmark and finish successfully.
The datasource must be either s3 source or JDBC (limited, and not your usecase so I will ignore it).
The example in the docs shows creating a dynamicframe from the (Glue/Lake formation) catalog using the tablename, not an explicit S3 path. This implies that reading from the catalog is still considered an S3 source; the underlying files will be on S3.
Files on s3 must be one of JSON, CSV, Apache Avro, XML for version 0.9 and above, or can be Parquet or ORC for version 1.0 and above
The datasource in the script must have a transformation_ctx
parameter.
The docs say
pass the transformation_ctx parameter only to those methods that you want to enable bookmarks You could add this to every transform for saving state but the critical one(s) are the datasource(s) you want to bookmark.
From the docs
job.commit()
and using the transformation_ctx
as aboveFor Amazon S3 input sources, job bookmarks check the last modified time of the objects, rather than the file names, to verify which objects need to be reprocessed. If your input source data has been modified since your last job run, the files are reprocessed when you run the job again.
Have you verified that your CSV files in the path "s3://xxx-glue/testing-csv"
do not already contain duplicates? You could use a Glue crawler or write DDL in Athena to create a table over them and look directly. Alternatively create a dev endpoint and run a zeppelin or sagemaker notebook and step through your code.
It doesn't mention anywhere that editing your script would reset your state, however, if you modified the transformation_ctx
of the datasource or other stages then that would likely impact the state, however I haven't verified that. The job has a Jobname
which keys the state, along with the run number, attempt number and version number that are used to manage retries and the latest state, which implies that minor changes to the script wouldn't affect the state as long as the Jobname
is consistent, but again I haven't verified that.
As an aside, in your code you test for inputGDF.toDF().head(1)
and then run inputGDF.toDF()...
to write the data. Spark is lazily evaluated but in that case you are running an equivalent dynamicframe to dataframe twice, and spark can't cache or reuse it. Better to do something like df = inputGDF.toDF()
before the if
and then reuse the df
twice.
Please check this doc about AWS Glue bookmarking mechanism.
Basically it requires to enable it via Console (or CloudFormation) and specify tranformation_context
parameter which uses together with some other attributes (like job name, source file names) to save checkpointing information. If you change value of one of these attributes then Glue will treat it as different checkpoint.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With