Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Adding a column in AWS glue dynamic dataframe

I am very new to AWS Glue. I am working on a small project and the ask is to read a file from S3 bucket, transpose it and load it in a mysql table. The source data in S3 bucket looks as below

    +----+----+-------+-----+---+--+--------+
    |cost|data|minutes|name |sms|id|category|
    +----+----+-------+-----+---+--+--------+
    |  5 |1000|  200  |prod1|500|p1|service |
    +----+----+-------+-----+---+--+--------+

The target table structure is Product_id, Parameter, value

I am expecting target table to have following values

p1, cost, 5

P1, data, 1000

I am able to load the target table with ID and Value. But I am not able to populate the parameter column. This column is not present in the input data and I want to populate a string depending on which column value I am populating.

Here is the code I used for cost.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

## @type: DataSource
## @args: [database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "mainclouddb", table_name = "s3product", transformation_ctx = "datasource0")

## @type: ApplyMapping
## @args: [mapping = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("cost", "long", "value", "int"), ("id", "string", "product_id", "string")], transformation_ctx = "applymapping1")

## @type: SelectFields
## @args: [paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2"]
## @return: selectfields2
## @inputs: [frame = applymapping1]
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["product_id", "parameter", "value"], transformation_ctx = "selectfields2")

## @type: ResolveChoice
## @args: [choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3"]
## @return: resolvechoice3
## @inputs: [frame = selectfields2]
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "resolvechoice3")

## @type: ResolveChoice
## @args: [choice = "make_cols", transformation_ctx = "resolvechoice4"]
## @return: resolvechoice4
## @inputs: [frame = resolvechoice3]
resolvechoice4 = ResolveChoice.apply(frame = resolvechoice3, choice = "make_cols", transformation_ctx = "resolvechoice4")

## @type: DataSink
## @args: [database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5"]
## @return: datasink5
## @inputs: [frame = resolvechoice4]
datasink5 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice4, database = "mainclouddb", table_name = "mysqlmaincloud_product_parameter_mapping", transformation_ctx = "datasink5")

job.commit()

Can somebody help me to add this new column to my data frame so that it can be made available in the table?

Thanks

like image 649
GK1 Avatar asked Apr 09 '26 12:04

GK1


1 Answers

For a smaller datsframe you can do the following

  1. convert the dynamic frame to spark dataframe
  2. add a column
  3. convert back to dynamic frame

step 1

datasource0 = datasource0.toDF()

step 2

from pyspark.sql.functions import udf
getNewValues = udf(lambda val: val+1) # you can do what you need to do here instead of val+1

datasource0 = datasource0.withColumn('New_Col_Name', getNewValues(col('some_existing_col'))

step 3

from awsglue.dynamicframe import DynamicFrame
datasource0 = DynamicFrame.fromDF(datasource0, glueContext, "datasource0")

The issue is when you have a large dataset the operation toDF() is very expensive!

like image 129
ppeiris Avatar answered Apr 12 '26 06:04

ppeiris



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!