Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I run SQL SELECT on AWS Glue created Dataframe in Spark?

I have the following job in AWS Glue which basically reads data from one table and extracts it as a csv file in S3, however I want to run a query on this table (A Select, SUM and GROUPBY) and want to get that output to CSV, how do I do this in AWS Glue? I am a newbie in Spark so please help

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = 
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx = 
"datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = 
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id", 
"int")], transformation_ctx = "applymapping1")

datasink2 = glueContext.write_dynamic_frame.from_options(frame = 
applymapping1, connection_type = "s3", connection_options = {"path": 
"s3://bucket"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
like image 725
EngineJanwaar Avatar asked Oct 17 '25 08:10

EngineJanwaar


1 Answers

The "create_dynamic_frame.from_catalog" function of glue context creates a dynamic frame and not dataframe. And dynamic frame does not support execution of sql queries.

To execute sql queries you will first need to convert the dynamic frame to dataframe, register a temp table in spark's memory and then execute the sql query on this temp table.

Sample code:

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext

glueContext = GlueContext(SparkContext.getOrCreate())
spark_session = glueContext.spark_session
sqlContext = SQLContext(spark_session.sparkContext, spark_session)

DyF = glueContext.create_dynamic_frame.from_catalog(database="{{database}}", table_name="{{table_name}}")
df = DyF.toDF()
df.registerTempTable('{{name}}')
df = sqlContext.sql('{{your select query with table name that you used for temp table above}}')
df.write.format('{{orc/parquet/whatever}}').partitionBy("{{columns}}").save('path to s3 location')
like image 158
Harsh Bafna Avatar answered Oct 18 '25 22:10

Harsh Bafna



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!