Is it possible to execute arbitrary SQL commands like ALTER TABLE from AWS Glue python job? I know I can use it to read data from tables but is there a way to execute other database specific commands?
I need to ingest data into a target database and then run some ALTER commands right after.
Create Sample stored procedure in Snowflake. select Add Job with appropriate Name, IAM role, type as Python Shell, and Python version as Python 3. Save the job and run the job. Check the Snowflake history to view the procedure being called.
AWS Glue Studio now provides the option to define transforms using SQL queries, allowing you to perform aggregations, easily apply filter logic to your data, add calculated fields, and more. This feature makes it easy to seamlessly mix SQL queries with AWS Glue Studio's visual transforms while authoring ETL jobs.
AWS Glue can connect to the following data stores through a JDBC connection: Amazon Redshift. Amazon Aurora. Microsoft SQL Server.
So after doing extensive research and also opening a case with AWS support, they told me it is not possible from Python shell or Glue pyspark job at this moment. But I just tried something creative and it worked! The idea is to use py4j that sparks relies on already and utilize standard java sql package.
Two huge benefits of this approach:
A huge benefit of this that you can define your database connection as Glue data connection and keep jdbc details and credentials in there without hardcoding them in the Glue code. My example below does that by calling glueContext.extract_jdbc_conf('your_glue_data_connection_name')
to get jdbc url and credentials, defined in Glue.
If you need to run SQL commands on a supported out of the box Glue database, you don't even need to use/pass jdbc driver for that database - just make sure you set up Glue connection for that database and add that connection to your Glue job - Glue will upload proper database driver jars.
Remember this code below is executed by a driver process and cannot be executed by Spark workers/executors.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();
conn.close()
I finally got this working after a couple of hours so hopefully the following will be helpful. My script is heavily influenced by the earlier responses, thank you.
Prerequisites:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
glue_connection_name = '[Name of your glue connection (not the job name)]'
database_name = '[name of your postgreSQL database]'
stored_proc = '[Stored procedure call, for example public.mystoredproc()]'
#Below this point no changes should be necessary.
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
glue_job_name = args['JOB_NAME']
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(glue_job_name, args)
job.commit()
logger = glueContext.get_logger()
logger.info('Getting details for connection ' + glue_connection_name)
source_jdbc_conf = glueContext.extract_jdbc_conf(glue_connection_name)
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url') + '/' + database_name, source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
logger.info('Connected to ' + conn.getMetaData().getDatabaseProductName() + ', ' + source_jdbc_conf.get('url') + '/' + database_name)
stmt = conn.createStatement();
rs = stmt.executeUpdate('call ' + stored_proc);
logger.info("Finished")
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With