I am trying to truncate a postgres destination table prior to insert, and in general, trying to fire external functions utilizing the connections already created in GLUE.
Has anyone been able to do so?
There are 3 steps you need to do to be able to use pg8000 in your Glue ETL jobs. Download the pg8000 archive file, re-zip its contents and copy the zip to an AWS S3 folder. Make your Glue ETL job aware of the location of the zip file. Start using the module in Glue as you would in any regular python program.
A DynamicFrame is similar to a DataFrame , except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type.
To get started, sign in to the AWS Management Console and open the AWS Glue console at https://console.aws.amazon.com/glue/ . Choose the Tables tab, and use the Add tables button to create tables either with a crawler or by manually typing attributes.
GlueContext is the entry point for reading and writing a DynamicFrame from and to Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, JDBC, and so on. This class provides utility functions to create DataSource trait and DataSink objects that can in turn be used to read and write DynamicFrame s.
I've tried the DROP/ TRUNCATE
scenario, but have not been able to do it with connections already created in Glue, but with a pure Python PostgreSQL driver, pg8000.
__init__.py
in the root folderPython lib path
of the job --
). Tick the "Server-side encryption" box.Then you can simply create a connection and execute SQL.
import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
import pg8000
args = getResolvedOptions(sys.argv, [
'JOB_NAME',
'PW',
'HOST',
'USER',
'DB'
])
# ...
# Create Spark & Glue context
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# ...
config_port = 5432
conn = pg8000.connect(
database=args['DB'],
user=args['USER'],
password=args['PW'],
host=args['HOST'],
port=config_port
)
query = "TRUNCATE TABLE {0};".format(".".join([schema, table]))
cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()
conn.close()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With