I am trying to move s3 files from a "non-deleting" bucket (meaning I can't delete the files) to GCS using airflow. I cannot be guaranteed that new files will be there everyday, but I must check for new files everyday.
my problem is the dynamic creation of subdags. If there ARE files, I need subdags. If there are NOT files, I don't need subdags. My problem is the upstream/downstream settings. In my code, it does detect files, but does not kick off the subdags as they are supposed to. I'm missing something.
here's my code:
from airflow import models
from airflow.utils.helpers import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.contrib.operators.s3_to_gcs_operator import S3ToGoogleCloudStorageOperator
from airflow.utils import dates
from airflow.models import Variable
import logging
args = {
'owner': 'Airflow',
'start_date': dates.days_ago(1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_success': True,
}
bucket = 'mybucket'
prefix = 'myprefix/'
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = []
parent_dag = models.DAG(
dag_id='My_Ingestion',
default_args=args,
schedule_interval='@daily',
catchup=False
)
def Check_For_Files(**kwargs):
s3 = S3Hook(aws_conn_id='S3_BOX')
s3.get_conn()
bucket = bucket
LastBDEXDate = int(Variable.get("last_publish_date"))
maxdate = LastBDEXDate
files = s3.list_keys(bucket_name=bucket, prefix='myprefix/file')
for file in files:
print(file)
print(file.split("_")[-2])
print(file.split("_")[-2][-8:]) ##proves I can see a date in the file name is ok.
maxdate = maxdate if maxdate > int(file.split("_")[-2][-8:]) else int(file.split("_")[-2][-8:])
if maxdate > LastBDEXDate:
return 'Start_Process'
return 'finished'
def create_subdag(dag_parent, dag_id_child_prefix, file_name):
# dag params
dag_id_child = '%s.%s' % (dag_parent.dag_id, dag_id_child_prefix)
# dag
subdag = models.DAG(dag_id=dag_id_child,
default_args=args,
schedule_interval=None)
# operators
s3_to_gcs_op = S3ToGoogleCloudStorageOperator(
task_id=dag_id_child,
bucket=bucket,
prefix=file_name,
dest_gcs_conn_id='GCP_Account',
dest_gcs='gs://my_files/To_Process/',
replace=False,
gzip=True,
dag=subdag)
return subdag
def create_subdag_operator(dag_parent, filename, index):
tid_subdag = 'file_{}'.format(index)
subdag = create_subdag(dag_parent, tid_subdag, filename)
sd_op = SubDagOperator(task_id=tid_subdag, dag=dag_parent, subdag=subdag)
return sd_op
def create_subdag_operators(dag_parent, file_list):
subdags = [create_subdag_operator(dag_parent, file, file_list.index(file)) for file in file_list]
# chain subdag-operators together
chain(*subdags)
return subdags
check_for_files = BranchPythonOperator(
task_id='Check_for_s3_Files',
provide_context=True,
python_callable=Check_For_Files,
dag=parent_dag
)
finished = DummyOperator(
task_id='finished',
dag=parent_dag
)
decision_to_continue = DummyOperator(
task_id='Start_Process',
dag=parent_dag
)
if len(files) > 0:
subdag_ops = create_subdag_operators(parent_dag, files)
check_for_files >> decision_to_continue >> subdag_ops[0] >> subdag_ops[-1] >> finished
check_for_files >> finished
Below is the recommended way to create a dynamic DAG or sub-DAG in airflow, though there are other ways also, but I guess this would be largely applicable to your problem.
First, create a file (yaml/csv)
which includes the list of all s3
files and locations, in your case you have written a function to store them in list, I would say store them in a separate yaml
file and load it at run time in airflow env and then create DAGs.
Below is a sample yaml
file:
dynamicDagConfigFile.yaml
job: dynamic-dag
bucket_name: 'bucket-name'
prefix: 'bucket-prefix'
S3Files:
- File1: 'S3Loc1'
- File2: 'S3Loc2'
- File3: 'S3Loc3'
You can modify your Check_For_Files
function to store them in a yaml
file.
Now we can move on to dynamic dag creation:
First define two tasks using dummy operators, i.e.the start and the end task. Such tasks are the ones in which we are going to build upon our DAG
by dynamically creating tasks between them:
start = DummyOperator(
task_id='start',
dag=dag
)
end = DummyOperator(
task_id='end',
dag=dag)
Dynamic DAG:
We will use PythonOperators
in airflow. The function should receive as arguments the task id; a python function to be executed, i.e., the python_callable for the Python operator; and a set of args to be used during the execution.
Include an argument the task id
. So, we can exchange data among tasks generated in dynamic way, e.g., via XCOM
.
You can specify your operation function within this dynamic dag like s3_to_gcs_op
.
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
Finally based on the location present in the yaml file you can create dynamic dags, first read the yaml
file as below and create dynamic dag:
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
# use safe_load instead to load the YAML file
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
Final DAG definition:
The idea is that
#once tasks are generated they should linked with the
#dummy operators generated in the start and end tasks.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
Full airflow code in order:
import yaml
import airflow
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
start = DummyOperator(
task_id='start',
dag=dag
)
def createDynamicDAG(task_id, callableFunction, args):
task = PythonOperator(
task_id = task_id,
provide_context=True,
#Eval is used since the callableFunction var is of type string
#while the python_callable argument for PythonOperators only receives objects of type callable not strings.
python_callable = eval(callableFunction),
op_kwargs = args,
xcom_push = True,
dag = dag,
)
return task
end = DummyOperator(
task_id='end',
dag=dag)
with open('/usr/local/airflow/dags/config_files/dynamicDagConfigFile.yaml') as f:
configFile = yaml.safe_load(f)
#Extract file list
S3Files = configFile['S3Files']
#In this loop tasks are created for each table defined in the YAML file
for S3File in S3Files:
for S3File, fieldName in S3File.items():
#Remember task id is provided in order to exchange data among tasks generated in dynamic way.
get_s3_files = createDynamicDAG('{}-getS3Data'.format(S3File),
'getS3Data',
{}) #your configs here.
#Second step is upload S3 to GCS
upload_s3_toGCS = createDynamicDAG('{}-uploadDataS3ToGCS'.format(S3File), 'uploadDataS3ToGCS', {'previous_task_id':'{}-'})
#write your configs again here like S3 bucket name prefix extra or read from yaml file, and other GCS config.
start >> get_s3_files
get_s3_files >> upload_s3_toGCS
upload_s3_toGCS >> end
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With