This is actually two questions combined into one.
My AIRFLOW_HOME
is structured like
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
I've been following astronomer.io's examples here https://github.com/airflow-plugins. My custom operators
use my custom hooks
, and all the imports are relative to the top level folder plugins
.
# my_operator.py
from plugins.hooks.my_hook import MyHook
However, when I tried moving my entire repository into the plugins folder, I get an import error after running airflow list_dags
saying that plugins
cannot be found.
I read up a little about it and apparently Airflow loads the plugins into its core module so they can be imported like
# my_operator.py
from airflow.hooks.my_hook import MyHook
So I changed all the imports to read directly from airflow.plugin_type
instead. I get another import error though, this time saying that my_hook
cannot be found. I restart my workers, scheduler and webserver every time but that doesn't seem to be the issue. I've looked at solutions proposed in similar questions and they don't work either.
The official documentation also shows this way https://airflow.apache.org/plugins.html of extending the AirflowPlugin
class, but I'm not sure where this "interface" should reside. I would also prefer a drag and drop option.
Finally, it clearly doesn't make sense for my code repo to be the plugins
folder itself, but if I separate them testing becomes inconvenient. Do I have to modify my Airflow configurations to point to my repo every time I run unit tests on my hooks/ops? What are the best practices for testing custom plugins?
I figured this out by doing some trial and error. This is the final structure of my AIRFLOW_HOME
folder
airflow
+-- dags
+-- plugins
+-- __init__.py
+-- plugin_name.py
+-- hooks
+-- __init__.py
+-- my_hook.py
+-- another_hook.py
+-- operators
+-- __init__.py
+-- my_operator.py
+-- another_operator.py
+-- sensors
+-- utils
In plugin_name.py
, I extend the AirflowPlugin
class
# plugin_name.py
from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc
class PluginName(AirflowPlugin):
name = 'plugin_name'
hooks = [MyHook]
operators = [MyOperator]
macros = [my_util_func]
In my custom operators which use my custom hooks, I import them like
# my_operator.py
from hooks.my_hook import MyHook
Then in my DAG files, I can do
# sample_dag.py
from airflow.operators.plugin_name import MyOperator
It is necessary to restart the webserver and scheduler. Took me a while to figure out.
This also facilitates testing since the imports within the custom classes are relative to the sub modules within the folder plugins
. I wonder if I can omit the __init__.py
file inside plugins
, but since everything is working I didn't try doing that.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With