Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Can't import Airflow plugins

Tags:

airflow

Following Airflow tutorial here.

Problem: The webserver returns the following error

Broken DAG: [/usr/local/airflow/dags/test_operator.py] cannot import name 
MyFirstOperator

Notes: The directory structure looks like this:

airflow_home
├── airflow.cfg
├── airflow.db
├── dags
│   └── test_operators.py  
├── plugins
│   └── my_operators.py   
└── unittests.cfg

I am attempting to import the plugin in 'test_operators.py' like this:

from airflow.operators import MyFirstOperator

The code is all the same as what is found in the tutorial.

like image 217
Christopher Carlson Avatar asked May 11 '17 06:05

Christopher Carlson


People also ask

How do I import plugins into Airflow?

Airflow has a simple plugin manager built-in that can integrate external features to its core by simply dropping files in your $AIRFLOW_HOME/plugins folder. The python modules in the plugins folder get imported, and macros and web views get integrated to Airflow's main collections and become available for use.

How do I import a Python library into Airflow?

You can do it in one of those ways: add your modules to one of the folders that Airflow automatically adds to PYTHONPATH. add extra folders where you keep your code to PYTHONPATH. package your code into a Python package and install it together with Airflow.


8 Answers

After struggling with the Airflow documentation and trying some of the answers here without success, I found this approach from astronomer.io.

As they point out, building an Airflow Plugin can be confusing and perhaps not the best way to add hooks and operators going forward.

Custom hooks and operators are a powerful way to extend Airflow to meet your needs. There is however some confusion on the best way to implement them. According to the Airflow documentation, they can be added using Airflow’s Plugins mechanism. This however, overcomplicates the issue and leads to confusion for many people. Airflow is even considering deprecating using the Plugins mechanism for hooks and operators going forward.

So instead of messing around with the Plugins API I followed Astronomer's approach, setting up Airflow as shown below.

dags
└── my_dag.py               (contains dag and tasks)
plugins
├── __init__.py
├── hooks
│   ├── __init__.py
│   └── mytest_hook.py      (contains class MyTestHook)
└── operators
    ├── __init__.py
    └── mytest_operator.py  (contains class MyTestOperator)

With this approach, all the code for my operator and hook live entirely in their respective files - and there's no confusing plugin file. All the __init__.py files are empty (unlike some equally confusing approaches of putting Plugin code in some of them).

For the imports needed, consider how Airflow actually uses the plugins directory:

When Airflow is running, it will add dags/, plugins/, and config/ to PATH

This means that doing from airflow.operators.mytest_operator import MyTestOperator probably isn't going to work. Instead from operators.mytest_operator import MyTestOperator is the way to go (note the alignment tofrom directory/file.py import Class in my setup above).

Working snippets from my files are shown below.

my_dag.py:

from airflow import DAG
from operators.mytest_operator import MyTestOperator
default_args = {....}
dag = DAG(....)
....
mytask = MyTestOperator(task_id='MyTest Task', dag=dag)
....

my_operator.py:

from airflow.models import BaseOperator
from hooks.mytest_hook import MyTestHook

class MyTestOperator(BaseOperator):
    ....
    hook = MyTestHook(....)
    ....

my_hook.py:

class MyTestHook():
    ....

This worked for me and was much simpler than trying to subclass AirflowPlugin. However it might not work for you if you want changes to the webserver UI:

Note: The Plugins mechanism still must be used for plugins that make changes to the webserver UI.

As an aside, the errors I was getting before this (that are now resolved):

ModuleNotFoundError: No module named 'mytest_plugin.hooks.mytest_hook'
ModuleNotFoundError: No module named 'operators.mytest_plugin'
like image 130
BjornO Avatar answered Oct 02 '22 03:10

BjornO


I use airflow 1.10. If it's a custom operator that you want to import, you can upload it to the airflow plugins folder, and then in the DAG specify the import as :

from [filename] import [classname]

where : filename is the name of your plugin file classname is the name of your class.

For example : If the name of your file is my_first_plugin and name of the class is MyFirstOperator then, the import would be :

from my_first_plugin import MyFirstOperator

Worked for me as I am using airflow 1.10

Thanks ! Hope this helps !!

like image 40
Sneha K Avatar answered Sep 30 '22 03:09

Sneha K


In the article it does like this:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]

Instead use:

class MyFirstPlugin(AirflowPlugin):
    name = "my_first_plugin"
    operators = [MyFirstOperator]
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of class(es) derived from BaseExecutor
    executors = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of objects created from a class derived
    # from flask_admin.BaseView
    admin_views = []
    # A list of Blueprint object created from flask.Blueprint
    flask_blueprints = []
    # A list of menu links (flask_admin.base.MenuLink)
    menu_links = []

Also don't use:

from airflow.operators import MyFirstOperator

According to the airflow article on plugins, it should be:

from airflow.operators.my_first_plugin import MyFirstOperator

If that doesn't work try:

from airflow.operators.my_operators import MyFirstOperator

If that doesn't work, check your web server log on startup for more information.

like image 24
jhnclvr Avatar answered Oct 04 '22 03:10

jhnclvr


Airflow version 2 introduced a new mechanism for plugin management as stated in their official documentation:

Changed in version 2.0: Importing operators, sensors, hooks added in plugins via airflow.{operators,sensors, hooks}.<plugin_name> is no longer supported, and these extensions should just be imported as regular python modules. For more information, see: Modules Management and Creating a custom Operator

All you need to manage your python codes, is to put your codes in plugins folder and then start addressing files from this point. suppose you have written TestClass in the test.py file located in the path $AIRFLOW_HOME/plugins/t_plugin/operators/test.py, in dag file you can import it this way:

from t_plugin.operators.test import TestClass
like image 27
smbanaei Avatar answered Oct 02 '22 03:10

smbanaei


I restarted the webserver, and now everything works fine.

Here is what I think might have happened:

  1. Before I started with the tutorial example, I tried running my own plugin and dag. There was a minor syntax error on the first run that I fixed, however after the fix I started getting the 'cannot import name' error.
  2. I deleted the plugin and dag, and tried using the one from the tutorial to see what was going on.

My guess is that the error from step 1 somehow affected step 2.

like image 31
Christopher Carlson Avatar answered Oct 01 '22 03:10

Christopher Carlson


I had to update the plugin path in file airflow.cfg in order to fix the problem.

Where your Airflow plugins are stored:

plugins_folder = /airflow/plugins
like image 27
Łukasz Włodarczyk Avatar answered Oct 04 '22 03:10

Łukasz Włodarczyk


I encountered the same error while following these tutorials.

My fault, however, was that I had used space character ' ' in task_id, which isn't supported by Airflow.

Clearly the error didn't point towards the actual problem. Restarting both Airflow scheduler and webserver then showed the correct error message on WebUI.

like image 31
y2k-shubham Avatar answered Oct 01 '22 03:10

y2k-shubham


As per the docs -

The python modules in the plugins folder get imported, and hooks, operators, sensors, macros, executors and web views get integrated to Airflow’s main collections and become available for use.

and works fine in version 1.10.1

like image 24
Sachin Kolige Avatar answered Oct 03 '22 03:10

Sachin Kolige