Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

AWS Glue - Truncate destination postgres table prior to insert

I am trying to truncate a postgres destination table prior to insert, and in general, trying to fire external functions utilizing the connections already created in GLUE.

Has anyone been able to do so?

like image 856
Josh Hamann Avatar asked Nov 02 '17 17:11

Josh Hamann


People also ask

How do you use pg8000 in glue job?

There are 3 steps you need to do to be able to use pg8000 in your Glue ETL jobs. Download the pg8000 archive file, re-zip its contents and copy the zip to an AWS S3 folder. Make your Glue ETL job aware of the location of the zip file. Start using the module in Glue as you would in any regular python program.

What is dynamic frame in AWS Glue?

A DynamicFrame is similar to a DataFrame , except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type.

How do you attach a table to glue?

To get started, sign in to the AWS Management Console and open the AWS Glue console at https://console.aws.amazon.com/glue/ . Choose the Tables tab, and use the Add tables button to create tables either with a crawler or by manually typing attributes.

What is GlueContext?

GlueContext is the entry point for reading and writing a DynamicFrame from and to Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, JDBC, and so on. This class provides utility functions to create DataSource trait and DataSink objects that can in turn be used to read and write DynamicFrame s.


1 Answers

I've tried the DROP/ TRUNCATE scenario, but have not been able to do it with connections already created in Glue, but with a pure Python PostgreSQL driver, pg8000.

  1. Download the tar of pg8000 from pypi
  2. Create an empty __init__.py in the root folder
  3. Zip up the contents & upload to S3
  4. Reference the zip file in the Python lib path of the job
  5. Set the DB connection details as job params (make sure to prepend all key names with --). Tick the "Server-side encryption" box.

Then you can simply create a connection and execute SQL.

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

import pg8000

args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = 5432
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "TRUNCATE TABLE {0};".format(".".join([schema, table]))
cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()
conn.close()
like image 78
thenaturalist Avatar answered Sep 19 '22 15:09

thenaturalist