I tried to run the Python Operator Example in my Airflow installation. The installation has deployed webserver, scheduler and worker on the same machine and runs with no complaints for all non-PytohnOperator tasks. The task fails, complaining that the module "unusual_prefix_*" could not be imported, where * is the name of the file containing the DAG.
The full stacktrace:
['/usr/bin/airflow', 'run', 'tutorialpy', 'print_the_context', '2016-08-23T10:00:00', '--pickle', '90', '--local']
[2016-08-29 11:35:20,054] {__init__.py:36} INFO - Using executor CeleryExecutor
[2016-08-29 11:35:20,175] {configuration.py:508} WARNING - section/key [core/fernet_key] not found in config
Traceback (most recent call last):
File "/usr/bin/airflow", line 90, in <module>
args.func(args)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/bin/cli.py", line 214, in run
DagPickle).filter(DagPickle.id == args.pickle).first()
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2659, in first
ret = list(self[0:1])
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 2457, in __getitem__
return list(res)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 86, in instances
util.raise_from_cause(err)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/util/compat.py", line 202, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 71, in instances
rows = [proc(row) for row in fetch]
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 428, in _instance
loaded_instance, populate_existing, populators)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/orm/loading.py", line 486, in _populate_full
dict_[key] = getter(row)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/sqlalchemy/sql/sqltypes.py", line 1253, in process
return loads(value)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 260, in loads
return load(file)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 250, in load
obj = pik.load()
File "/usr/lib/python2.7/pickle.py", line 858, in load
dispatch[key](self)
File "/usr/lib/python2.7/pickle.py", line 1090, in load_global
klass = self.find_class(module, name)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/dill/dill.py", line 406, in find_class
return StockUnpickler.find_class(self, module, name)
File "/usr/lib/python2.7/pickle.py", line 1124, in find_class
__import__(module)
ImportError: No module named unusual_prefix_tutorialpy
[2016-08-29 11:35:20,498: ERROR/Worker-1] Command 'airflow run tutorialpy print_the_context 2016-08-23T10:00:00 --pickle 90 --local ' returned non-zero exit status 1
[2016-08-29 11:35:20,502: ERROR/MainProcess] Task airflow.executors.celery_executor.execute_command[01152b52-044e-4361-888c-ef2d45983c60] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/opt/mf-airflow/airflow/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 45, in execute_command
raise AirflowException('Celery command failed')
AirflowException: Celery command failed
Using donot_pickle=True
option (see Documentation) I got rid of the error. Probably not the best solution, but good enough for my scenario.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With