I am running Airflowv1.10.15 on Cloud Composer v1.16.16.
My DAG looks like this :
from datetime import datetime, timedelta
# imports
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from scripts import workday_extract, workday_config_large
default_args = {
'owner': 'xxxx',
'depends_on_past': False,
'start_date': datetime(2021, 9, 14),
'email_on_failure': True,
'email': ['xxxx'],
'retries': 1,
'retry_delay': timedelta(minutes=2),
'catchup': False
}
# Define the DAG with parameters
dag = DAG(
dag_id='xxxx_v1',
schedule_interval='0 20 * * *',
default_args=default_args,
catchup=False,
max_active_runs=1,
concurrency=1
)
def wd_to_bq(key, val, **kwargs):
logger.info("workday to BQ ingestion")
workday_extract.fetch_wd_load_bq(key, val)
start_load = DummyOperator(task_id='start', dag=dag)
end_load = DummyOperator(task_id='end', dag=dag)
for key, val in workday_config_large.endpoint_tbl_mapping.items():
# Task 1: Process the unmatched records from the view
workday_to_bq = PythonOperator(
dag=dag,
task_id=f'{key}',
execution_timeout=timedelta(minutes=60),
provide_context=True,
python_callable=wd_to_bq,
op_kwargs={'key': key, 'val': val}
)
start_load >> workday_to_bq >> end_load
The task fails with error - Task exited with return code Negsignal.SIGKILL . The python script runs fine on my local machine and completes in 15 minutes. There are multiple endpoints from which the reports are extracted. However, the one that takes longest ( ~15 minutes) fails with this error and others succeed.
I have tried a lot of options but none seem to work. Can someone help on this ?
I resolved the issue by increasing memory size
https://github.com/apache/airflow/issues/10435
Should check the memory size of the pod that roles as worker while running the task
A message like below in your airflow task logs suggests that the kernel/OS killed your process. SIGKILL(signal 9) is a directive to kill the process immediately.
{{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL
It is very likely that the task you are performing(in this case the function - workday_to_bq) was utilizing a lot of resources on the worker container. I'm assuming that you are ingesting and processing some data which can be memory intensive.
You've mentioned that its working locally but failing in airflow cloud. This could be because either you have a lot of RAM on local system OR your cloud composer airflow workers are processing other DAG's that are hogging the worker memory. To confirm that this is a memory issue you can check the dashboard provided by the cloud service.
Airflow runs its tasks on workers, hence you will have to upgrade the workers with better hardware. Try increasing the RAM.
Note that the purpose of airflow is to schedule ETL tasks and orchestrate the pipeline. You shouldn't be loading high volumes of data into the airflow workers and utilize all of its cpu/memory. This will slow down your entire airflow environment or SIGKILL your DAGS randomly. In most cases only the DAG/process that is using too much memory will be killed by the OOM killer, but sometimes it can kill other DAGS/process's on the same worker simultaneously.
For loading/processing/writing large amounts of data use ETL tools like fivetran, airbyte, databricks, nifi, azure data factory etc.. and use airflow for scheduling and orchestration.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With