I have followed
this tutorial in attempt to build an airflow cluster on localhost with my own DAGs. When I ran airflow scheduler
after having set executor = CeleryExecutor
in the config file, I received the following traceback:
Traceback (most recent call last):
File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in args.func(args)
File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py", line 839, in scheduler job.run()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 200, in run self._execute()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1309, in _execute self._execute_helper(processor_manager)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py", line 1441, in _execute_helper self.executor.heartbeat()
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 124, in heartbeat self.execute_async(key, command=command, queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 80, in execute_async args=[command], queue=queue)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py", line 573, in apply_async **dict(self._get_exec_options(), **options)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py", line 354, in send_task reply_to=reply_to or self.oid, **options
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py", line 310, in publish_task **kwargs
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 172, in publish routing_key, mandatory, immediate, exchange, declare)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py", line 449, in _ensured return fun(*args, **kwargs)
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py", line 188, in _publish mandatory=mandatory, immediate=immediate,
File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py", line 122, in basic_publish mandatory or False, immediate or False,
TypeError: an integer is required (got type NoneType)
Some additional information:
I am new to Airflow/Celery/RabbitMQ/SQL, so any help would be appreciated!
To add to previous answer. Using py-amqp involves either changing from broker_url = amqp://XXXXX
to broker_url = pyamqp://XXXXX
OR
pip uninstall librabbitmq
.
Additionally you may need to change celery_result_backend
variable to result_backend
in your airflow.cfg
. The celery_
prefix has been removed for variables in the [celery]
node in airflow.cfg
in recent versions.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With