Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

What am I doing wrong in this DAG setup for KubernetesPodOperator

I found the following Airflow DAG in this Blog Post:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

and before I attempted to add anything custom to it ... attempted to run it as is. However, the code seems to timeout in my airflow environment.

Per documentation here I attempted to set startup_timeout_seconds to something ridiculous like 10m ... but still got the the timeout message described in the documentation:

[2019-01-04 11:13:33,360] {pod_launcher.py:112} INFO - Event: fail-7dd76b92 had an event of type Pending
Traceback (most recent call last):
  File "/usr/local/bin/airflow", line 6, in <module>
    exec(compile(open(__file__).read(), __file__, 'exec'))
  File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
    args.func(args)
  File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
    pool=args.pool,
  File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
    result = func(*args, **kwargs)
  File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
    raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start

Any Input would be appreciated.

like image 377
CaffeineAddiction Avatar asked Jan 04 '19 11:01

CaffeineAddiction


People also ask

Why do Kubernetes pods fail?

First, there are two reasons why your Pods can fail. Errors in the configuration of your Kubernetes resources like deployment and services. Problems in your code. In the former case, containers do not start. In the latter instance, the application code fails after the container starts up. We’ll address each of these situations systematically.

Why does my K8s pod keep restarting?

In this case, you should expect to see some restarts because K8S attempts to start Pods repeatedly when errors occur. If the Pod is in status Running and your app is still not working correctly, proceed to tips three and four. If you see one of the error codes on the Pod status, you can get more information with the describe command.

What should I do if the pod is not running correctly?

If the Pod is in status Running and your app is still not working correctly, proceed to tips three and four. If you see one of the error codes on the Pod status, you can get more information with the describe command. This is helpful in situations where the container itself did not start.

What do the error codes on the POD status mean?

If you see one of the error codes on the Pod status, you can get more information with the describe command. This is helpful in situations where the container itself did not start. The last line of the screenshot indicates that the Pod has not started due to the lack of CPU resources, see the message at the bottom.


1 Answers

Since this code isn’t using fully qualified images, that means Airflow is pulling the images from hub.docker.com, and "Python:3.6" and "ubuntu:1604" aren’t available docker images names for Python or Ubuntu in hub.docker.com.

Also the "Python" command shouldn’t be capitalised.

A working code with valid docker image names would be:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="python:3.6-stretch",
                          cmds=["python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:16.04",
                          cmds=["python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)
like image 50
rilla Avatar answered Oct 19 '22 03:10

rilla