Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to run arbitrary / DDL SQL statements or stored procedures using AWS Glue

Is it possible to execute arbitrary SQL commands like ALTER TABLE from AWS Glue python job? I know I can use it to read data from tables but is there a way to execute other database specific commands?

I need to ingest data into a target database and then run some ALTER commands right after.

like image 469
mishkin Avatar asked Nov 10 '20 19:11

mishkin


People also ask

How do you call a snowflake stored procedure in AWS Glue?

Create Sample stored procedure in Snowflake. select Add Job with appropriate Name, IAM role, type as Python Shell, and Python version as Python 3. Save the job and run the job. Check the Snowflake history to view the procedure being called.

Does AWS Glue use SQL?

AWS Glue Studio now provides the option to define transforms using SQL queries, allowing you to perform aggregations, easily apply filter logic to your data, add calculated fields, and more. This feature makes it easy to seamlessly mix SQL queries with AWS Glue Studio's visual transforms while authoring ETL jobs.

Can AWS Glue connect to SQL Server?

AWS Glue can connect to the following data stores through a JDBC connection: Amazon Redshift. Amazon Aurora. Microsoft SQL Server.


2 Answers

So after doing extensive research and also opening a case with AWS support, they told me it is not possible from Python shell or Glue pyspark job at this moment. But I just tried something creative and it worked! The idea is to use py4j that sparks relies on already and utilize standard java sql package.

Two huge benefits of this approach:

  1. A huge benefit of this that you can define your database connection as Glue data connection and keep jdbc details and credentials in there without hardcoding them in the Glue code. My example below does that by calling glueContext.extract_jdbc_conf('your_glue_data_connection_name') to get jdbc url and credentials, defined in Glue.

  2. If you need to run SQL commands on a supported out of the box Glue database, you don't even need to use/pass jdbc driver for that database - just make sure you set up Glue connection for that database and add that connection to your Glue job - Glue will upload proper database driver jars.

Remember this code below is executed by a driver process and cannot be executed by Spark workers/executors.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

logger = glueContext.get_logger()

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")

conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))

print(conn.getMetaData().getDatabaseProductName())

# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();

conn.close()
like image 95
mishkin Avatar answered Sep 24 '22 13:09

mishkin


I finally got this working after a couple of hours so hopefully the following will be helpful. My script is heavily influenced by the earlier responses, thank you.

Prerequisites:

  • You will want the Glue connection configured and tested before attempting any scripts.
  • When setting up your AWS Glue job, use Spark, Glue version 2.0 or later, and Python version 3.
  • I recommend to configure this job for just 2 worker threads to save on cost; the bulk of the work is going to be done by the database, not by glue.
  • The following is tested with an AWS RDS PostgreSQL instance, but is hopefully flexible enough to work for other databases.
  • The script needs 3 parameters updated near the top of the script (glue_connection_name, database_name, and stored_proc).
  • The JOB_NAME, connection string, and credentials are retrieved by the script and do not need to be supplied.
  • If your stored proc will return a dataset then replace executeUpdate with executeQuery.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
    
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
    
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
    
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
    
logger = glueContext.get_logger()
    
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
    
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
    
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
    
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
    
logger.info("Finished")
like image 39
Dan Crosby Avatar answered Sep 21 '22 13:09

Dan Crosby