airflow cfg settings related to celery are:
broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name
The airflow webserver
runs ok, but while running a task from airflow UI I get the error.
I am error while running airflow scheduler,tracecak is:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 125, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 172, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 952, in run
executor.heartbeat()
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py", line 124, in heartbeat
self.execute_async(key, command=command, queue=queue)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 80, in execute_async
args=[command], queue=queue)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 536, in apply_async
**options
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 734, in send_task
with self.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 863, in producer_or_acquire
producer, self.producer_pool.acquire, block=True,
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 1233, in producer_pool
return self.amqp.producer_pool
File "/usr/local/lib/python2.7/dist-packages/celery/app/amqp.py", line 614, in producer_pool
self.app.connection_for_write()]
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 760, in connection_for_write
return self._connection(url or self.conf.broker_write_url, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 828, in _connection
'broker_connection_timeout', connect_timeout
File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 179, in __init__
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 83, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 64, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: '
My module versions are:
You can run a task independently by using -i/-I/-A flags along with the run command.
max_active_runs_per_dag : Determines the maximum number of active DAG runs (per DAG) that the Airflow scheduler can create at a time. In Airflow, a DAG run represents an instantiation of a DAG in time, much like a task instance represents an instantiation of a task.
concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.
One can take a different approach by increasing the number of threads available on the machine that runs the scheduler process so that the max_threads parameter can be set to a higher value. With a higher value, the Airflow scheduler will be able to more effectively process the increased number of DAGs.
I wasted lot of time on this issue, the reason for this error was quotation marks in broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
just removing the quotes: broker_url = amqp://guest:guest@rabbitmq_server:8080
solved the problem.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With