Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Writing and importing custom plugins in Airflow

Tags:

airflow

This is actually two questions combined into one.

My AIRFLOW_HOME is structured like

airflow
+-- dags
+-- plugins
    +-- __init__.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py
        +-- another_hook.py
    +-- operators
        +-- __init__.py
        +-- my_operator.py
        +-- another_operator.py
    +-- sensors
    +-- utils

I've been following astronomer.io's examples here https://github.com/airflow-plugins. My custom operators use my custom hooks, and all the imports are relative to the top level folder plugins.

# my_operator.py
from plugins.hooks.my_hook import MyHook

However, when I tried moving my entire repository into the plugins folder, I get an import error after running airflow list_dags saying that plugins cannot be found.

I read up a little about it and apparently Airflow loads the plugins into its core module so they can be imported like

# my_operator.py
from airflow.hooks.my_hook import MyHook

So I changed all the imports to read directly from airflow.plugin_type instead. I get another import error though, this time saying that my_hook cannot be found. I restart my workers, scheduler and webserver every time but that doesn't seem to be the issue. I've looked at solutions proposed in similar questions and they don't work either.

The official documentation also shows this way https://airflow.apache.org/plugins.html of extending the AirflowPlugin class, but I'm not sure where this "interface" should reside. I would also prefer a drag and drop option.

Finally, it clearly doesn't make sense for my code repo to be the plugins folder itself, but if I separate them testing becomes inconvenient. Do I have to modify my Airflow configurations to point to my repo every time I run unit tests on my hooks/ops? What are the best practices for testing custom plugins?

like image 799
absolutelydevastated Avatar asked Dec 01 '22 13:12

absolutelydevastated


1 Answers

I figured this out by doing some trial and error. This is the final structure of my AIRFLOW_HOME folder

airflow 
+-- dags 
+-- plugins
    +-- __init__.py
    +-- plugin_name.py
    +-- hooks
        +-- __init__.py
        +-- my_hook.py 
        +-- another_hook.py 
    +-- operators
        +-- __init__.py
        +-- my_operator.py 
        +-- another_operator.py 
    +-- sensors 
    +-- utils

In plugin_name.py, I extend the AirflowPlugin class

# plugin_name.py

from airflow.plugins_manager import AirflowPlugin
from hooks.my_hook import *
from operators.my_operator import *
from utils.my_utils import *
# etc

class PluginName(AirflowPlugin):

    name = 'plugin_name'

    hooks = [MyHook]
    operators = [MyOperator]
    macros = [my_util_func]

In my custom operators which use my custom hooks, I import them like

# my_operator.py

from hooks.my_hook import MyHook

Then in my DAG files, I can do

# sample_dag.py

from airflow.operators.plugin_name import MyOperator

It is necessary to restart the webserver and scheduler. Took me a while to figure out.

This also facilitates testing since the imports within the custom classes are relative to the sub modules within the folder plugins. I wonder if I can omit the __init__.py file inside plugins, but since everything is working I didn't try doing that.

like image 147
absolutelydevastated Avatar answered Dec 20 '22 18:12

absolutelydevastated