I am struggling to make the airflow worker run tasks. I started services:
airflow worker --debug
airflow webserver
airflow scheduler
airflow flower #to check celery queues in UI at localhost:5555
These process runs fine, but when scheduler is adding the task to run to the queue or when I am trying to run a task from airflow UI the scheduler and webserver are getting hanged -continuously loading not proceeding any further - while adding the task to the queue:
I think the issue has to do with the communication between scheduler/webserver and queue. My settings related to the broker in airflow.cfg file are: broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//
- I have also tried: broker_url = pyamqp://guest:***@ksaprice_rabbitmq:15672//
. The rabbitmq is server is running fine and I tested the login and password credentials as well.
Version I am using are:
I am new to Airflow and Rabbitmq.
Update:
My queuing problem was solved by the answer of @Jean-Sébastien Pédron but still my workers are not executing the task and flower is not displaying the worker although airflow worker
service is running at 8793 port.
Rabbitmq report:
Status of node ksaprice_rabbitmq@4eed789778c0
[{pid,233},
{running_applications,
[{rabbitmq_management,"RabbitMQ Management Console","3.6.10"},
{rabbitmq_web_dispatch,"RabbitMQ Web Dispatcher","3.6.10"},
{rabbitmq_management_agent,"RabbitMQ Management Agent","3.6.10"},
{amqp_client,"RabbitMQ AMQP Client","3.6.10"},
{cowboy,"Small, fast, modular HTTP server.","1.0.4"},
{cowlib,"Support library for manipulating Web protocols.","1.0.2"},
{inets,"INETS CXC 138 49","6.3.4"},
{rabbit,"RabbitMQ","3.6.10"},
{mnesia,"MNESIA CXC 138 12","4.14.2"},
{rabbit_common,
"Modules shared by rabbitmq-server and rabbitmq-erlang-client",
"3.6.10"},
{compiler,"ERTS CXC 138 10","7.0.3"},
{os_mon,"CPO CXC 138 46","2.4.1"},
{ranch,"Socket acceptor pool for TCP protocols.","1.3.0"},
{ssl,"Erlang/OTP SSL application","8.1"},
{public_key,"Public key infrastructure","1.3"},
{crypto,"CRYPTO","3.7.2"},
{xmerl,"XML parser","1.3.12"},
{asn1,"The Erlang ASN1 compiler version 4.0.4","4.0.4"},
{syntax_tools,"Syntax tools","2.1.1"},
{sasl,"SASL CXC 138 11","3.0.2"},
{stdlib,"ERTS CXC 138 10","3.2"},
{kernel,"ERTS CXC 138 10","5.1.1"}]},
{os,{unix,linux}},
{erlang_version,
"Erlang/OTP 19 [erts-8.2.1] [source] [64-bit] [smp:2:2] [async-threads:64] [hipe] [kernel-poll:true]\n"},
{memory,
[{total,77018832},
{connection_readers,334888},
{connection_writers,14640},
{connection_channels,132040},
{connection_other,477152},
{queue_procs,65480},
{queue_slave_procs,0},
{plugins,2287080},
{other_proc,19854000},
{mnesia,77272},
{metrics,239992},
{mgmt_db,852688},
{msg_index,44208},
{other_ets,2577600},
{binary,3923976},
{code,24680786},
{atom,1033401},
{other_system,20660789}]},
{alarms,[]},
{listeners,[{clustering,25672,"::"},{amqp,5672,"::"},{http,15672,"::"}]},
{vm_memory_high_watermark,0.4},
{vm_memory_limit,830581964},
{disk_free_limit,50000000},
{disk_free,56083853312},
{file_descriptors,
[{total_limit,1048476},
{total_used,13},
{sockets_limit,943626},
{sockets_used,10}]},
{processes,[{limit,1048576},{used,420}]},
{run_queue,0},
{uptime,45431},
{kernel,{net_ticktime,60}}]
Cluster status of node ksaprice_rabbitmq@4eed789778c0
[{nodes,[{disc,[ksaprice_rabbitmq@4eed789778c0]}]},
{running_nodes,[ksaprice_rabbitmq@4eed789778c0]},
{cluster_name,<<"ksaprice_rabbitmq@4eed789778c0">>},
{partitions,[]},
{alarms,[{ksaprice_rabbitmq@4eed789778c0,[]}]}]
Application environment of node ksaprice_rabbitmq@4eed789778c0
[{amqp_client,[{prefer_ipv6,false},{ssl_options,[]}]},
{asn1,[]},
{compiler,[]},
{cowboy,[]},
{cowlib,[]},
{crypto,[]},
{inets,[]},
{kernel,
[{error_logger,tty},
{inet_default_connect_options,[{nodelay,true}]},
{inet_dist_listen_max,25672},
{inet_dist_listen_min,25672}]},
{mnesia,[{dir,"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq"}]},
{os_mon,
[{start_cpu_sup,false},
{start_disksup,false},
{start_memsup,false},
{start_os_sup,false}]},
{public_key,[]},
{rabbit,
[{auth_backends,[rabbit_auth_backend_internal]},
{auth_mechanisms,['PLAIN','AMQPLAIN']},
{background_gc_enabled,false},
{background_gc_target_interval,60000},
{backing_queue_module,rabbit_priority_queue},
{channel_max,0},
{channel_operation_timeout,15000},
{cluster_keepalive_interval,10000},
{cluster_nodes,{[],disc}},
{cluster_partition_handling,ignore},
{collect_statistics,fine},
{collect_statistics_interval,5000},
{config_entry_decoder,
[{cipher,aes_cbc256},
{hash,sha512},
{iterations,1000},
{passphrase,undefined}]},
{credit_flow_default_credit,{400,200}},
{default_permissions,[<<".*">>,<<".*">>,<<".*">>]},
{default_user,<<"guest">>},
{default_user_tags,[administrator]},
{default_vhost,<<"/">>},
{delegate_count,16},
{disk_free_limit,50000000},
{disk_monitor_failure_retries,10},
{disk_monitor_failure_retry_interval,120000},
{enabled_plugins_file,"/etc/rabbitmq/enabled_plugins"},
{error_logger,tty},
{fhc_read_buffering,false},
{fhc_write_buffering,true},
{frame_max,131072},
{halt_on_upgrade_failure,true},
{handshake_timeout,10000},
{heartbeat,60},
{hipe_compile,false},
{hipe_modules,
[rabbit_reader,rabbit_channel,gen_server2,rabbit_exchange,
rabbit_command_assembler,rabbit_framing_amqp_0_9_1,rabbit_basic,
rabbit_event,lists,queue,priority_queue,rabbit_router,rabbit_trace,
rabbit_misc,rabbit_binary_parser,rabbit_exchange_type_direct,
rabbit_guid,rabbit_net,rabbit_amqqueue_process,
rabbit_variable_queue,rabbit_binary_generator,rabbit_writer,
delegate,gb_sets,lqueue,sets,orddict,rabbit_amqqueue,
rabbit_limiter,gb_trees,rabbit_queue_index,
rabbit_exchange_decorator,gen,dict,ordsets,file_handle_cache,
rabbit_msg_store,array,rabbit_msg_store_ets_index,rabbit_msg_file,
rabbit_exchange_type_fanout,rabbit_exchange_type_topic,mnesia,
mnesia_lib,rpc,mnesia_tm,qlc,sofs,proplists,credit_flow,pmon,
ssl_connection,tls_connection,ssl_record,tls_record,gen_fsm,ssl]},
{lazy_queue_explicit_gc_run_operation_threshold,1000},
{log_levels,[{connection,info}]},
{loopback_users,[]},
{memory_monitor_interval,2500},
{mirroring_flow_control,true},
{mirroring_sync_batch_size,4096},
{mnesia_table_loading_retry_limit,10},
{mnesia_table_loading_retry_timeout,30000},
{msg_store_credit_disc_bound,{4000,800}},
{msg_store_file_size_limit,16777216},
{msg_store_index_module,rabbit_msg_store_ets_index},
{msg_store_io_batch_size,4096},
{num_ssl_acceptors,1},
{num_tcp_acceptors,10},
{password_hashing_module,rabbit_password_hashing_sha256},
{plugins_dir,
"/usr/lib/rabbitmq/plugins:/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.10/plugins"},
{plugins_expand_dir,
"/var/lib/rabbitmq/mnesia/ksaprice_rabbitmq-plugins-expand"},
{queue_explicit_gc_run_operation_threshold,1000},
{queue_index_embed_msgs_below,4096},
{queue_index_max_journal_entries,32768},
{reverse_dns_lookups,false},
{sasl_error_logger,tty},
{server_properties,[]},
{ssl_allow_poodle_attack,false},
{ssl_apps,[asn1,crypto,public_key,ssl]},
{ssl_cert_login_from,distinguished_name},
{ssl_handshake_timeout,5000},
{ssl_listeners,[]},
{ssl_options,[]},
{tcp_listen_options,
[{backlog,128},
{nodelay,true},
{linger,{true,0}},
{exit_on_close,false}]},
{tcp_listeners,[5672]},
{trace_vhosts,[]},
{vm_memory_high_watermark,0.4},
{vm_memory_high_watermark_paging_ratio,0.5}]},
{rabbit_common,[]},
{rabbitmq_management,
[{cors_allow_origins,[]},
{cors_max_age,1800},
{http_log_dir,none},
{listener,[{port,15672}]},
{load_definitions,none},
{management_db_cache_multiplier,5},
{process_stats_gc_timeout,300000},
{stats_event_max_backlog,250}]},
{rabbitmq_management_agent,
[{rates_mode,basic},
{sample_retention_policies,
[{global,[{605,5},{3660,60},{29400,600},{86400,1800}]},
{basic,[{605,5},{3600,60}]},
{detailed,[{605,5}]}]}]},
{rabbitmq_web_dispatch,[]},
{ranch,[]},
{sasl,[{errlog_type,error},{sasl_error_logger,tty}]},
{ssl,[]},
{stdlib,[]},
{syntax_tools,[]},
{xmerl,[]}]
Connections:
pid name port peer_port host peer_host ssl peer_cert_subject peer_cert_issuer peer_cert_validity auth_mechanismssl_protocol ssl_key_exchange ssl_cipher ssl_hash protocol user vhost timeout frame_max channel_max client_properties connected_at recv_oct recv_cnt send_oct send_cnt send_pend state channels reductions garbage_collection
<[email protected]> 172.25.0.2:47982 -> 172.25.0.4:5672 5672 47982 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749265595 1897 10 606 7 0 running 1 235055 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,170}]
<[email protected]> 172.25.0.2:48764 -> 172.25.0.4:5672 5672 48764 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346461 289 5 554 4 0 running 1 226409 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]
<[email protected]> 172.25.0.2:48766 -> 172.25.0.4:5672 5672 48766 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346494 1647 21 1030 20 0 running 1 228859 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<[email protected]> 172.25.0.2:48768 -> 172.25.0.4:5672 5672 48768 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346494 569 9 662 8 0 running 1 226947 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,164}]
<[email protected]> 172.25.0.2:48770 -> 172.25.0.4:5672 5672 48770 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346495 1647 20 1030 20 0 running 1 228798 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,167}]
<[email protected]> 172.25.0.2:48772 -> 172.25.0.4:5672 5672 48772 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346511 85953 485 1042 21 0 running 1 280680 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,110}]
<[email protected]> 172.25.0.2:48774 -> 172.25.0.4:5672 5672 48774 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346548 665 7 566 5 0 running 1 226665 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,168}]
<[email protected]> 172.25.0.2:48776 -> 172.25.0.4:5672 5672 48776 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346551 1647 21 1030 20 0 running 1 228859 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,171}]
<[email protected]> 172.25.0.2:48780 -> 172.25.0.4:5672 5672 48780 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346576 1691 9 566 5 0 running 1 226936 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,169}]
<[email protected]> 172.25.0.2:48778 -> 172.25.0.4:5672 5672 48778 172.25.0.4 172.25.0.2 false AMQPLAIN {0,9,1} admin ksaprice_rabbitmq_vh 0 131072 65535 [{"product","py-amqp"},{"product_version","2.2.1"},{"capabilities",[{"connection.blocked",true},{"authentication_failure_close",true},{"consumer_cancel_notify",true}]}] 1501749346576 1496 9 566 5 0 running 1 226885 [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,161}]
Channels:
pid name connection number user vhost reductions transactional confirm consumer_count messages_unacknowledged messages_unconfirmed messages_uncommitted acks_uncommitted prefetch_count global_prefetch_count state garbage_collection
<[email protected]> 172.25.0.2:47982 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 4140 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<[email protected]> 172.25.0.2:48764 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 1706 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}]
<[email protected]> 172.25.0.2:48768 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 4737 false false 1 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]
<[email protected]> 172.25.0.2:48770 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 8608 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]> 172.25.0.2:48766 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 7977 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]> 172.25.0.2:48772 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 116017 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,1}]
<[email protected]> 172.25.0.2:48776 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 7977 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}]
<[email protected]> 172.25.0.2:48774 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 3048 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<[email protected]> 172.25.0.2:48778 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 2854 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,4}]
<[email protected]> 172.25.0.2:48780 -> 172.25.0.4:5672 (1) <[email protected]> 1 admin ksaprice_rabbitmq_vh 3245 false false 0 0 0 0 0 0 0 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,6}]
Queues on ksaprice_rabbitmq_vh:
pid name durable auto_delete arguments owner_pid exclusive messages_ready messages_unacknowledged messages reductions policy exclusive_consumer_pid exclusive_consumer_tag consumers consumer_utilisation memory slave_pids synchronised_slave_pids recoverable_slaves state garbage_collection messages_ram messages_ready_ram messages_unacknowledged_ram messages_persistent message_bytes message_bytes_ready message_bytes_unacknowledged message_bytes_ram message_bytes_persistent head_message_timestamp disk_reads disk_writes backing_queue_status messages_paged_out message_bytes_paged_out
<[email protected]> default true false [] false 6 0 6 88075 0 89344 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,3}] 6 6 0 6 1231 1231 0 1231 1231 0 6 [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,6}, {len,6}, {target_ram_count,infinity}, {next_seq_id,6}, {avg_ingress_rate,9.303060867567184e-92}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}] 0 0
<[email protected]> celeryev.b957bbf3-8b97-4633-897f-a887b49e617b false true [{"x-message-ttl",5000},{"x-expires",60000}] false 0 0 0 4739 1 1.0 22160 running [{max_heap_size,0}, {min_bin_vheap_size,46422}, {min_heap_size,233}, {fullsweep_after,65535}, {minor_gcs,8}] 0 0 0 0 0 0 0 0 0 0 0 [{mode,default}, {q1,0}, {q2,0}, {delta,{delta,undefined,0,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}] 0 0
Queues on ksaprice_rabbitmq:
Queues on /:
Exchanges on ksaprice_rabbitmq_vh:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []
celery.pidbox fanout false false false []
celeryev topic true false false []
default direct true false false []
reply.celery.pidbox direct false false false []
Exchanges on ksaprice_rabbitmq:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []
Exchanges on /:
name type durable auto_delete internal arguments policy
direct true false false []
amq.direct direct true false false []
amq.fanout fanout true false false []
amq.headers headers true false false []
amq.match headers true false false []
amq.rabbitmq.log topic true false true []
amq.rabbitmq.trace topic true false true []
amq.topic topic true false false []
Bindings on ksaprice_rabbitmq_vh:
source_name source_kind destination_name destination_kind routing_key arguments vhost
exchange celeryev.b957bbf3-8b97-4633-897f-a887b49e617b queue celeryev.b957bbf3-8b97-4633-897f-a887b49e617b [] ksaprice_rabbitmq_vh
exchange default queue default [] ksaprice_rabbitmq_vh
celeryev exchange celeryev.b957bbf3-8b97-4633-897f-a887b49e617b queue # [] ksaprice_rabbitmq_vh
default exchange default queue default [] ksaprice_rabbitmq_vh
Bindings on ksaprice_rabbitmq:
Bindings on /:
Consumers on ksaprice_rabbitmq_vh:
queue_name channel_pid consumer_tag ack_required prefetch_count arguments
celeryev.b957bbf3-8b97-4633-897f-a887b49e617b <[email protected]> None4 false 0 []
Consumers on ksaprice_rabbitmq:
Consumers on /:
Permissions on ksaprice_rabbitmq_vh:
user configure write read
admin .* .* .*
Permissions on ksaprice_rabbitmq:
Permissions on /:
user configure write read
guest .* .* .*
Policies on ksaprice_rabbitmq_vh:
Policies on ksaprice_rabbitmq:
Policies on /:
Parameters on ksaprice_rabbitmq_vh:
Parameters on ksaprice_rabbitmq:
Parameters on /:
I don't know Airflow, but I believe the URL you are using to target RabbitMQ is incorrect:
broker_url = amqp://guest:***@ksaprice_rabbitmq:15672//
RabbitMQ uses TCP port 15672 for its management web UI, so it's an HTTP server listening there.
The AMQP port is 5672 (the standard one). So I would try with the following URL:
broker_url = amqp://guest:***@ksaprice_rabbitmq//
I.e. without the port because the client should default to the standard one.
In the rabbitmqctl report
output, we can see that:
default
with 6 messages waiting, but no consumers subscribed to it.celeryev.b957bbf3-8b97-4633-897f-a887b49e617b
with no messages and one consumer waiting for messages.So there must be some incorrect or missing configuration on either side (scheduler or worker) because they are not talking to each other currently: tasks arrives on one queue, but the worker is watching another queue.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With