Logo Questions Linux Laravel Mysql Ubuntu Git Menu

Airflow Running task from UI, KeyError: No such transport

airflow cfg settings related to celery are:

broker_url = 'amqp://guest:guest@rabbitmq_server:8080'
celery_result_backend = db+postgresql://developer:password@postgres_server:5432/db_name

The airflow webserver runs ok, but while running a task from airflow UI I get the error. error while running a task from ui

I am error while running airflow scheduler,tracecak is:

 Traceback (most recent call last):
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1988, in wsgi_app
response = self.full_dispatch_request()
 File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1641, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1544, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1639, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python2.7/dist-packages/flask/app.py", line 1625, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 69, in inner
return self._run_view(f, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_admin/base.py", line 368, in _run_view
return fn(self, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/flask_login.py", line 755, in decorated_view
return func(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 125, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/utils.py", line 172, in wrapper
return f(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/airflow/www/views.py", line 952, in run
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/base_executor.py", line 124, in heartbeat
self.execute_async(key, command=command, queue=queue)
File "/usr/local/lib/python2.7/dist-packages/airflow/executors/celery_executor.py", line 80, in execute_async
args=[command], queue=queue)
File "/usr/local/lib/python2.7/dist-packages/celery/app/task.py", line 536, in apply_async
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 734, in send_task
with self.producer_or_acquire(producer) as P:
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 863, in producer_or_acquire
producer, self.producer_pool.acquire, block=True,
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 1233, in producer_pool
return self.amqp.producer_pool
File "/usr/local/lib/python2.7/dist-packages/celery/app/amqp.py", line 614, in producer_pool
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 760, in connection_for_write
return self._connection(url or self.conf.broker_write_url, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/base.py", line 828, in _connection
'broker_connection_timeout', connect_timeout
File "/usr/local/lib/python2.7/dist-packages/kombu/connection.py", line 179, in __init__
if not get_transport_cls(transport).can_parse_url:
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 83, in get_transport_cls
_transport_cache[transport] = resolve_transport(transport)
File "/usr/local/lib/python2.7/dist-packages/kombu/transport/__init__.py", line 64, in resolve_transport
raise KeyError('No such transport: {0}'.format(transport))
KeyError: u'No such transport: '

My module versions are:

  1. airflow==1.8
  2. celery==4.1.0
  3. kombu==4.1.0
  4. python==2.7.12
like image 424
javed Avatar asked Aug 02 '17 08:08


People also ask

How do I run a specific task in airflow?

You can run a task independently by using -i/-I/-A flags along with the run command.

What is Max active runs airflow?

max_active_runs_per_dag : Determines the maximum number of active DAG runs (per DAG) that the Airflow scheduler can create at a time. In Airflow, a DAG run represents an instantiation of a DAG in time, much like a task instance represents an instantiation of a task.

What is task concurrency airflow?

concurrency : This is the maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to set 1 DAG to be able to run 32 tasks at once, while another DAG might only be able to run 16 tasks at once.

How can you improve airflow performance?

One can take a different approach by increasing the number of threads available on the machine that runs the scheduler process so that the max_threads parameter can be set to a higher value. With a higher value, the Airflow scheduler will be able to more effectively process the increased number of DAGs.

1 Answers

I wasted lot of time on this issue, the reason for this error was quotation marks in broker_url = 'amqp://guest:guest@rabbitmq_server:8080' just removing the quotes: broker_url = amqp://guest:guest@rabbitmq_server:8080 solved the problem.

like image 163
javed Avatar answered Sep 26 '22 10:09
