I am trying to using the kubernetes pod operator in airflow, and there is a directory that I wish to share with kubernetes pod on my airflow worker, is there is a way to mount airflow worker's directory to kubernetes pod?
I tried with the code below, and the volumn seems not mounted successfully.
import datetime
import unittest
from unittest import TestCase
from airflow.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.kubernetes.volume import Volume
from airflow.kubernetes.volume_mount import VolumeMount
class TestMailAlarm(TestCase):
def setUp(self):
self.namespace = "test-namespace"
self.image = "ubuntu:16.04"
self.name = "default"
self.cluster_context = "default"
self.dag_id = "test_dag"
self.task_id = "root_test_dag"
self.execution_date = datetime.datetime.now()
self.context = {"dag_id": self.dag_id,
"task_id": self.task_id,
"execution_date": self.execution_date}
self.cmds = ["sleep"]
self.arguments = ["100"]
self.volume_mount = VolumeMount('test',
mount_path='/tmp',
sub_path=None,
read_only=False)
volume_config = {
'persistentVolumeClaim':
{
'claimName': 'test'
}
}
self.volume = Volume(name='test', configs=volume_config)
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volume=[self.volume],
volume_mount=[self.volume_mount],
**self.context)
def test_execute(self):
self.operator.execute(self.context)
Define a Kubernetes pod with two containersEach has the shared volume mounted on a directory, which is specified by the mountPath.
You can get the volumes mounted on the pod using the output of kubectl describe pod which has the Mounts section in each container's spec . You can then exec into the pod using kubectl exec and the cd to the directory you want to write data to.
The example in the docs seems pretty similar to your code, only the parameters are plurals volume_mounts
and volumes
. For your code it would look like this:
self.operator = KubernetesPodOperator(
namespace=self.namespace, image=self.image, name=self.name,
cmds=self.cmds,
arguments=self.arguments,
startup_timeout_seconds=600,
is_delete_operator_pod=True,
# the operator could run successfully but the directory /tmp is not mounted to kubernetes operator
volumes=[self.volume],
volume_mounts=[self.volume_mount],
**self.context)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With