I found the following Airflow DAG in this Blog Post:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="Python:3.6",
cmds=["Python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:1604",
cmds=["Python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)
and before I attempted to add anything custom to it ... attempted to run it as is. However, the code seems to timeout in my airflow environment.
Per documentation here I attempted to set startup_timeout_seconds
to something ridiculous like 10m ... but still got the the timeout message described in the documentation:
[2019-01-04 11:13:33,360] {pod_launcher.py:112} INFO - Event: fail-7dd76b92 had an event of type Pending
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 6, in <module>
exec(compile(open(__file__).read(), __file__, 'exec'))
File "/usr/local/lib/airflow/airflow/bin/airflow", line 27, in <module>
args.func(args)
File "/usr/local/lib/airflow/airflow/bin/cli.py", line 392, in run
pool=args.pool,
File "/usr/local/lib/airflow/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/airflow/airflow/models.py", line 1492, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execute
raise AirflowException('Pod Launching failed: {error}'.format(error=ex))
airflow.exceptions.AirflowException: Pod Launching failed: Pod took too long to start
Any Input would be appreciated.
First, there are two reasons why your Pods can fail. Errors in the configuration of your Kubernetes resources like deployment and services. Problems in your code. In the former case, containers do not start. In the latter instance, the application code fails after the container starts up. We’ll address each of these situations systematically.
In this case, you should expect to see some restarts because K8S attempts to start Pods repeatedly when errors occur. If the Pod is in status Running and your app is still not working correctly, proceed to tips three and four. If you see one of the error codes on the Pod status, you can get more information with the describe command.
If the Pod is in status Running and your app is still not working correctly, proceed to tips three and four. If you see one of the error codes on the Pod status, you can get more information with the describe command. This is helpful in situations where the container itself did not start.
If you see one of the error codes on the Pod status, you can get more information with the describe command. This is helpful in situations where the container itself did not start. The last line of the screenshot indicates that the Pod has not started due to the lack of CPU resources, see the message at the bottom.
Since this code isn’t using fully qualified images, that means Airflow is pulling the images from hub.docker.com, and "Python:3.6"
and "ubuntu:1604"
aren’t available docker images names for Python or Ubuntu in hub.docker.com.
Also the "Python" command shouldn’t be capitalised.
A working code with valid docker image names would be:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="python:3.6-stretch",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
failing = KubernetesPodOperator(namespace='default',
image="ubuntu:16.04",
cmds=["python","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="fail",
task_id="failing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
failing.set_upstream(start)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With