My Ariflow script has several tasks using python operator to start a glue job using boto3. Even though the flow depends on each task in Ariflow. The consecutive tasks don't wait until the previous task's successful completion.
Next glue job task is called as soon as the previous glue job task is invoked. At the end even though the Airflow looks like successfully completed the glue jobs are still running for several minutes.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import TimeDeltaSensor
import boto3
import json
default_args = {
'owner': 'SAM',
'start_date': datetime(2019, 5, 27)
}
glue_client = boto3.client('glue', region_name='us-east-1')
def run_customer_job():
glue_client.start_job_run(JobName='customer')
def run_product_job():
glue_client.start_job_run(JobName='product')
with DAG('dataload', default_args=default_args, schedule_interval="0 15 * * *") as dag:
task1 = PythonOperator(task_id='task1',
python_callable=run_customer_job)
task2 = PythonOperator(task_id='task2',
python_callable=run_product_job)
task1 >> task2
To start an AWS Glue Job, you need to use the start_job_run () method of the Boto3 Glue client. This method triggers the job execution, which will invoke the Python script located in the S3 bucket. Here’s an execution output:
To start working with AWS Glue using Boto3, you need to set up your Python environment on your laptop. In summary, this is what you will need: Alternatively, you can set up and launch a Cloud9 IDE. AWS Glue allows you to use crawlers to populate the AWS Glue Data Catalog tables.
Step 1: Import boto3 and botocore exceptions to handle exceptions. Step 2: crawler_name is the required parameter in this function. Step 3: Create an AWS session using boto3 lib. Make sure region_name is mentioned in the default profile. If it is not mentioned, then explicitly pass the region_name while creating the session.
Step 1: Import boto3 and botocore exceptions to handle exceptions Step 2: crawler_name is the parameter in this function. Step 3: Create an AWS session using boto3 lib. Make sure region_name is mentioned in the default profile.
Create a new GlueOperator, based on https://github.com/apache/airflow/blob/master/airflow/contrib/operators/awsbatch_operator.py
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With