Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Airflow Scheduler and Webserver hangs while queuing the task to run on RabbitMQ

I am struggling to make the airflow worker run tasks. I started services:

airflow worker --debug
airflow webserver
airflow scheduler
airflow flower #to check celery queues in UI at localhost:5555

These process runs fine, but when scheduler is adding the task to run to the queue or when I am trying to run a task from airflow UI the scheduler and webserver are getting hanged -continuously loading not proceeding any further - while adding the task to the queue: Scheduler hanging Airflow UI Webserver hanging Workers are not receiving tasks from the rabbitmq queue Flower server is hanging

I think the issue has to do with the communication between scheduler/webserver and queue. My settings related to the broker in airflow.cfg file are: broker_url = amqp://guest:***@ksaprice_rabbitmq:15672// - I have also tried: broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672//. The rabbitmq is server is running fine and I tested the login and password credentials as well.

Version I am using are:

  • airflow==1.8.1
  • celery=4.1
  • rabbitmq server 3.6

I am new to Airflow and Rabbitmq.

Update: My queuing problem was solved by the answer of @Jean-Sébastien Pédron but still my workers are not executing the task and flower is not displaying the worker although airflow worker service is running at 8793 port.

Rabbitmq report:

Status of node ksaprice_rabbitmq@4eed789778c0
[{pid,233},
 {running_applications,
     [{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
      {rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
      {rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
      {amqp_client,"RabbitMQ AMQP Client","3.6.10"},
      {cowboy,"Small, fast, modular HTTP server.","1.0.4"},
      {cowlib,"Support library for manipulating Web protocols.","1.0.2"},
      {inets,"INETS  CXC 138 49","6.3.4"},
      {rabbit,"RabbitMQ","3.6.10"},
      {mnesia,"MNESIA  CXC 138 12","4.14.2"},
      {rabbit_common,
          "Modules shared by rabbitmq-server and rabbitmq-erlang-client",
          "3.6.10"},
      {compiler,"ERTS  CXC 138 10","7.0.3"},
      {os_mon,"CPO  CXC 138 46","2.4.1"},
      {ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
      {ssl,"Erlang/OTP SSL application","8.1"},
      {public_key,"Public key infrastructure","1.3"},
      {crypto,"CRYPTO","3.7.2"},
      {xmerl,"XML parser","1.3.12"},
      {asn1,"The Erlang ASN1 compiler version 4.0.4","4.0.4"},
      {syntax_tools,"Syntax tools","2.1.1"},
      {sasl,"SASL  CXC 138 11","3.0.2"},
      {stdlib,"ERTS  CXC 138 10","3.2"},
      {kernel,"ERTS  CXC 138 10","5.1.1"}]},
 {os,{unix,linux}},
 {erlang_version,
     "Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [smp:2:2] [async-threads:64] [hipe] [kernel-poll:true]\n"},
 {memory,
     [{total,77018832},
      {connection_readers,334888},
      {connection_writers,14640},
      {connection_channels,132040},
      {connection_other,477152},
      {queue_procs,65480},
      {queue_slave_procs,0},
      {plugins,2287080},
      {other_proc,19854000},
      {mnesia,77272},
      {metrics,239992},
      {mgmt_db,852688},
      {msg_index,44208},
      {other_ets,2577600},
      {binary,3923976},
      {code,24680786},
      {atom,1033401},
      {other_system,20660789}]},
 {alarms,[]},
 {listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},
 {vm_memory_high_watermark,0.4},
 {vm_memory_limit,830581964},
 {disk_free_limit,50000000},
 {disk_free,56083853312},
 {file_descriptors,
     [{total_limit,1048476},
      {total_used,13},
      {sockets_limit,943626},
      {sockets_used,10}]},
 {processes,[{limit,1048576},{used,420}]},
 {run_queue,0},
 {uptime,45431},
 {kernel,{net_ticktime,60}}]

Cluster status of node ksaprice_rabbitmq@4eed789778c0
[{nodes,[{disc,[ksaprice_rabbitmq@4eed789778c0]}]},
 {running_nodes,[ksaprice_rabbitmq@4eed789778c0]},
 {cluster_name,<<"ksaprice_rabbitmq@4eed789778c0">>},
 {partitions,[]},
 {alarms,[{ksaprice_rabbitmq@4eed789778c0,[]}]}]

Application environment of node ksaprice_rabbitmq@4eed789778c0
[{amqp_client,[{prefer_ipv6,false},{ssl_options,[]}]},
 {asn1,[]},
 {compiler,[]},
 {cowboy,[]},
 {cowlib,[]},
 {crypto,[]},
 {inets,[]},
 {kernel,
     [{error_logger,tty},
      {inet_default_connect_options,[{nodelay,true}]},
      {inet_dist_listen_max,25672},
      {inet_dist_listen_min,25672}]},
 {mnesia,[{dir,"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq"}]},
 {os_mon,
     [{start_cpu_sup,false},
      {start_disksup,false},
      {start_memsup,false},
      {start_os_sup,false}]},
 {public_key,[]},
 {rabbit,
     [{auth_backends,[rabbit_auth_backend_internal]},
      {auth_mechanisms,['PLAIN','AMQPLAIN']},
      {background_gc_enabled,false},
      {background_gc_target_interval,60000},
      {backing_queue_module,rabbit_priority_queue},
      {channel_max,0},
      {channel_operation_timeout,15000},
      {cluster_keepalive_interval,10000},
      {cluster_nodes,{[],disc}},
      {cluster_partition_handling,ignore},
      {collect_statistics,fine},
      {collect_statistics_interval,5000},
      {config_entry_decoder,
          [{cipher,aes_cbc256},
           {hash,sha512},
           {iterations,1000},
           {passphrase,undefined}]},
      {credit_flow_default_credit,{400,200}},
      {default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
      {default_user,<<"guest">>},
      {default_user_tags,[administrator]},
      {default_vhost,<<"/">>},
      {delegate_count,16},
      {disk_free_limit,50000000},
      {disk_monitor_failure_retries,10},
      {disk_monitor_failure_retry_interval,120000},
      {enabled_plugins_file,"/etc/rabbitmq/enabled_plugins"},
      {error_logger,tty},
      {fhc_read_buffering,false},
      {fhc_write_buffering,true},
      {frame_max,131072},
      {halt_on_upgrade_failure,true},
      {handshake_timeout,10000},
      {heartbeat,60},
      {hipe_compile,false},
      {hipe_modules,
          [rabbit_reader,rabbit_channel,gen_server2,rabbit_exchange,
           rabbit_command_assembler,rabbit_framing_amqp_0_9_1,rabbit_basic,
           rabbit_event,lists,queue,priority_queue,rabbit_router,rabbit_trace,
           rabbit_misc,rabbit_binary_parser,rabbit_exchange_type_direct,
           rabbit_guid,rabbit_net,rabbit_amqqueue_process,
           rabbit_variable_queue,rabbit_binary_generator,rabbit_writer,
           delegate,gb_sets,lqueue,sets,orddict,rabbit_amqqueue,
           rabbit_limiter,gb_trees,rabbit_queue_index,
           rabbit_exchange_decorator,gen,dict,ordsets,file_handle_cache,
           rabbit_msg_store,array,rabbit_msg_store_ets_index,rabbit_msg_file,
           rabbit_exchange_type_fanout,rabbit_exchange_type_topic,mnesia,
           mnesia_lib,rpc,mnesia_tm,qlc,sofs,proplists,credit_flow,pmon,
           ssl_connection,tls_connection,ssl_record,tls_record,gen_fsm,ssl]},
      {lazy_queue_explicit_gc_run_operation_threshold,1000},
      {log_levels,[{connection,info}]},
      {loopback_users,[]},
      {memory_monitor_interval,2500},
      {mirroring_flow_control,true},
      {mirroring_sync_batch_size,4096},
      {mnesia_table_loading_retry_limit,10},
      {mnesia_table_loading_retry_timeout,30000},
      {msg_store_credit_disc_bound,{4000,800}},
      {msg_store_file_size_limit,16777216},
      {msg_store_index_module,rabbit_msg_store_ets_index},
      {msg_store_io_batch_size,4096},
      {num_ssl_acceptors,1},
      {num_tcp_acceptors,10},
      {password_hashing_module,rabbit_password_hashing_sha256},
      {plugins_dir,
          "/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins"},
      {plugins_expand_dir,
          "/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq-plugins-expand"},
      {queue_explicit_gc_run_operation_threshold,1000},
      {queue_index_embed_msgs_below,4096},
      {queue_index_max_journal_entries,32768},
      {reverse_dns_lookups,false},
      {sasl_error_logger,tty},
      {server_properties,[]},
      {ssl_allow_poodle_attack,false},
      {ssl_apps,[asn1,crypto,public_key,ssl]},
      {ssl_cert_login_from,distinguished_name},
      {ssl_handshake_timeout,5000},
      {ssl_listeners,[]},
      {ssl_options,[]},
      {tcp_listen_options,
          [{backlog,128},
           {nodelay,true},
           {linger,{true,0}},
           {exit_on_close,false}]},
      {tcp_listeners,[5672]},
      {trace_vhosts,[]},
      {vm_memory_high_watermark,0.4},
      {vm_memory_high_watermark_paging_ratio,0.5}]},
 {rabbit_common,[]},
 {rabbitmq_management,
     [{cors_allow_origins,[]},
      {cors_max_age,1800},
      {http_log_dir,none},
      {listener,[{port,15672}]},
      {load_definitions,none},
      {management_db_cache_multiplier,5},
      {process_stats_gc_timeout,300000},
      {stats_event_max_backlog,250}]},
 {rabbitmq_management_agent,
     [{rates_mode,basic},
      {sample_retention_policies,
          [{global,[{605,5},{3660,60},{29400,600},{86400,1800}]},
           {basic,[{605,5},{3600,60}]},
           {detailed,[{605,5}]}]}]},
 {rabbitmq_web_dispatch,[]},
 {ranch,[]},
 {sasl,[{errlog_type,error},{sasl_error_logger,tty}]},
 {ssl,[]},
 {stdlib,[]},
 {syntax_tools,[]},
 {xmerl,[]}]

Connections:
pid     name    port    peer_port       host    peer_host       ssl     peer_cert_subject       peer_cert_issuer        peer_cert_validity      auth_mechanismssl_protocol    ssl_key_exchange        ssl_cipher      ssl_hash        protocol        user    vhost   timeout frame_max       channel_max     client_properties     connected_at    recv_oct        recv_cnt        send_oct        send_cnt        send_pend       state   channels        reductions      garbage_collection
<[email protected]>       172.25.0.2:47982 -> 172.25.0.4:5672     5672    47982   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749265595   1897  10      606     7       0       running 1       235055  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,170}]
<[email protected]>       172.25.0.2:48764 -> 172.25.0.4:5672     5672    48764   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346461   289   5       554     4       0       running 1       226409  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]
<[email protected]>       172.25.0.2:48766 -> 172.25.0.4:5672     5672    48766   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346494   1647  21      1030    20      0       running 1       228859  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<[email protected]>       172.25.0.2:48768 -> 172.25.0.4:5672     5672    48768   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346494   569   9       662     8       0       running 1       226947  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,164}]
<[email protected]>       172.25.0.2:48770 -> 172.25.0.4:5672     5672    48770   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346495   1647  20      1030    20      0       running 1       228798  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,167}]
<[email protected]>       172.25.0.2:48772 -> 172.25.0.4:5672     5672    48772   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346511   85953 485     1042    21      0       running 1       280680  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,110}]
<[email protected]>       172.25.0.2:48774 -> 172.25.0.4:5672     5672    48774   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346548   665   7       566     5       0       running 1       226665  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,168}]
<[email protected]>       172.25.0.2:48776 -> 172.25.0.4:5672     5672    48776   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346551   1647  21      1030    20      0       running 1       228859  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<[email protected]>       172.25.0.2:48780 -> 172.25.0.4:5672     5672    48780   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346576   1691  9       566     5       0       running 1       226936  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,169}]
<[email protected]>       172.25.0.2:48778 -> 172.25.0.4:5672     5672    48778   172.25.0.4      172.25.0.2      false                         AMQPLAIN                                        {0,9,1} admin   ksaprice_rabbitmq_vh    0       131072  65535   [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}]      1501749346576   1496  9       566     5       0       running 1       226885  [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]

Channels:
pid     name    connection      number  user    vhost   reductions      transactional   confirm consumer_count  messages_unacknowledged messages_unconfirmed  messages_uncommitted    acks_uncommitted        prefetch_count  global_prefetch_count   state   garbage_collection
<[email protected]>       172.25.0.2:47982 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  4140    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<[email protected]>       172.25.0.2:48764 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  1706    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<[email protected]>       172.25.0.2:48768 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  4737    false   false   1       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]
<[email protected]>       172.25.0.2:48770 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  8608    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]>       172.25.0.2:48766 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  7977    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]>       172.25.0.2:48772 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  116017  false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,1}]
<[email protected]>       172.25.0.2:48776 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  7977    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]>       172.25.0.2:48774 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  3048    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<[email protected]>       172.25.0.2:48778 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  2854    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<[email protected]>       172.25.0.2:48780 -> 172.25.0.4:5672 (1) <[email protected]>       1       admin   ksaprice_rabbitmq_vh  3245    false   false   0       0       0       0       0       0       0       running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]

Queues on ksaprice_rabbitmq_vh:
pid     name    durable auto_delete     arguments       owner_pid       exclusive       messages_ready  messages_unacknowledged messages        reductions    policy  exclusive_consumer_pid  exclusive_consumer_tag  consumers       consumer_utilisation    memory  slave_pids      synchronised_slave_pids recoverable_slaves    state   garbage_collection      messages_ram    messages_ready_ram      messages_unacknowledged_ram     messages_persistent     message_bytes   message_bytes_ready   message_bytes_unacknowledged    message_bytes_ram       message_bytes_persistent        head_message_timestamp  disk_reads      disk_writes   backing_queue_status    messages_paged_out      message_bytes_paged_out
<[email protected]>       default true    false   []              false   6       0       6       88075                           0             89344                           running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]  6       6       0       6       1231    1231    0       1231    1231            0       6       [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,6}, {len,6}, {target_ram_count,infinity}, {next_seq_id,6}, {avg_ingress_rate,9.303060867567184e-92}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]    0       0
<[email protected]>       celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   false   true    [{"x-message-ttl",5000},{"x-expires",60000}]          false   0       0       0       4739                            1       1.0     22160                           running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]  0       0       0       0       0       0       0       0       0             0       0       [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]        0       0

Queues on ksaprice_rabbitmq:

Queues on /:

Exchanges on ksaprice_rabbitmq_vh:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []
celery.pidbox   fanout  false   false   false   []
celeryev        topic   true    false   false   []
default direct  true    false   false   []
reply.celery.pidbox     direct  false   false   false   []

Exchanges on ksaprice_rabbitmq:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []

Exchanges on /:
name    type    durable auto_delete     internal        arguments       policy
        direct  true    false   false   []
amq.direct      direct  true    false   false   []
amq.fanout      fanout  true    false   false   []
amq.headers     headers true    false   false   []
amq.match       headers true    false   false   []
amq.rabbitmq.log        topic   true    false   true    []
amq.rabbitmq.trace      topic   true    false   true    []
amq.topic       topic   true    false   false   []

Bindings on ksaprice_rabbitmq_vh:
source_name     source_kind     destination_name        destination_kind        routing_key     arguments       vhost
        exchange        celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   queue   celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   []      ksaprice_rabbitmq_vh
        exchange        default queue   default []      ksaprice_rabbitmq_vh
celeryev        exchange        celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   queue   #       []      ksaprice_rabbitmq_vh
default exchange        default queue   default []      ksaprice_rabbitmq_vh

Bindings on ksaprice_rabbitmq:

Bindings on /:

Consumers on ksaprice_rabbitmq_vh:
queue_name      channel_pid     consumer_tag    ack_required    prefetch_count  arguments
celeryev.b957bbf3-8b97-4633-897f-a887b49e617b   <[email protected]>       None4   false   0       []

Consumers on ksaprice_rabbitmq:

Consumers on /:

Permissions on ksaprice_rabbitmq_vh:
user    configure       write   read
admin   .*      .*      .*

Permissions on ksaprice_rabbitmq:

Permissions on /:
user    configure       write   read
guest   .*      .*      .*

Policies on ksaprice_rabbitmq_vh:

Policies on ksaprice_rabbitmq:

Policies on /:

Parameters on ksaprice_rabbitmq_vh:

Parameters on ksaprice_rabbitmq:

Parameters on /:
like image 234
javed Avatar asked Oct 18 '22 08:10

javed


1 Answers

About Airflow hanging

I don't know Airflow, but I believe the URL you are using to target RabbitMQ is incorrect:

broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//

RabbitMQ uses TCP port 15672 for its management web UI, so it's an HTTP server listening there.

The AMQP port is 5672 (the standard one). So I would try with the following URL:

broker_url = amqp://guest:***@ksaprice_rabbitmq//

I.e. without the port because the client should default to the standard one.

About tasks not being executed by the worker(s)

In the rabbitmqctl report output, we can see that:

  • There is a queue named default with 6 messages waiting, but no consumers subscribed to it.
  • There is a queue named celeryev.b957bbf3-8b97-4633-897f-a887b49e617b with no messages and one consumer waiting for messages.

So there must be some incorrect or missing configuration on either side (scheduler or worker) because they are not talking to each other currently: tasks arrives on one queue, but the worker is watching another queue.

like image 185
Jean-Sébastien Pédron Avatar answered Oct 21 '22 09:10

Jean-Sébastien Pédron