I can see similar questions have been asked before but those are running multi processors and not executors. Therefore I am unsure how to fix this.
the GitHub issue also say its resolved in 4.1 https://github.com/celery/celery/issues/1709
I am using
celery==4.1.1
django-celery==3.2.1
django-celery-beat==1.0.1
django-celery-results==1.0.1
My script as as follows, ive tried to cut it down to show relevant code only.
@asyncio.coroutine
def snmp_get(ip, oid, snmp_user, snmp_auth, snmp_priv):
results=[]
snmpEngine = SnmpEngine()
errorIndication, errorStatus, errorIndex, varBinds = yield from getCmd(
...
)
...
for varBind in varBinds:
results.append(' = '.join([x.prettyPrint() for x in varBind]))
snmpEngine.transportDispatcher.closeDispatcher()
return results
def create_link_data_record(link_data):
obj = LinkData.objects.create(
...
)
return 'data polled for {} record {} created'.format(link_data.hostname, obj.id)
async def retrieve_data(link, loop):
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor(2)
poll_interval = 60
results = []
# credentials:
...
print('polling data for {} on {}'.format(hostname,link_mgmt_ip))
# create link data obj
link_data = LinkDataObj()
...
# first poll for speeds
download_speed_data_poll1 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
download_speed_data_poll1 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index ,snmp_user, snmp_auth, snmp_priv)
# check we were able to poll
if 'timeout' in str(get_snmp_value(download_speed_data_poll1)).lower():
return 'timeout trying to poll {} - {}'.format(hostname ,link_mgmt_ip)
upload_speed_data_poll1 = await snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# wait for poll interval
await asyncio.sleep(poll_interval)
# second poll for speeds
download_speed_data_poll2 = await snmp_get(link_mgmt_ip, down_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
upload_speed_data_poll2 = await snmp_get(link_mgmt_ip, up_speed_oid % link_index, snmp_user, snmp_auth, snmp_priv)
# create deltas for speed
down_delta = int(get_snmp_value(download_speed_data_poll2)) - int(get_snmp_value(download_speed_data_poll1))
up_delta = int(get_snmp_value(upload_speed_data_poll2)) - int(get_snmp_value(upload_speed_data_poll1))
...
results.append(await loop.run_in_executor(executor, create_link_data_record, link_data))
return results
def get_link_data():
link_data = LinkTargets.objects.all()
# create loop
loop = asyncio.get_event_loop()
if asyncio.get_event_loop().is_closed():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(asyncio.new_event_loop())
# create tasks
tasks = [asyncio.ensure_future(retrieve_data(link, loop)) for link in link_data]
if tasks:
start = time.time()
done, pending = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
the error below which references the run_in_executor code
[2018-05-24 14:13:00,840: ERROR/ForkPoolWorker-3] Task exception was never retrieved
future: <Task finished coro=<retrieve_data() done, defined at /itapp/itapp/monitoring/jobs/link_monitoring.py:130> exception=AssertionError('daemonic processes are not allowed to have children',)>
Traceback (most recent call last):
File "/itapp/itapp/monitoring/jobs/link_monitoring.py", line 209, in retrieve_data
link_data.last_change = await loop.run_in_executor(executor, timestamp, (link_data.link_target_id, link_data.service_status))
File "/usr/local/lib/python3.6/asyncio/base_events.py", line 639, in run_in_executor
return futures.wrap_future(executor.submit(func, *args), loop=self)
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
self._start_queue_management_thread()
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
self._adjust_process_count()
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
p.start()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
Setup 1 Step 1: Add celery.py#N#Inside the “picha” directory, create a new file called celery.py:#N#from __future__ import... 2 Step 2: Import your new Celery app#N#To ensure that the Celery app is loaded when Django starts, add the following code... 3 Step 3: Install Redis as a Celery “Broker” More ...
Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits. pebble uses daemonic processes to keep the parent process from hanging on exit.
Celery Tasks. Celery utilizes tasks, which can be thought of as regular Python functions that are called with Celery. For example, let’s turn this basic function into a Celery task: def add(x, y): return x + y. First, add a decorator: from celery.decorators import task @task(name="sum_two_numbers") def add(x, y): return x + y.
The async context can be imposed upon you by the environment in which you are running your Django code. For example, Jupyter notebooks and IPython interactive shells both transparently provide an active event loop so that it is easier to interact with asynchronous APIs.
Try with Celery 5-devel
pip install git+https://github.com/celery/[email protected]
As per below issue
https://github.com/celery/celery/issues/3884
Celery 5.0 will support asyncio. We currently do not support it.
And then there is also below SO thread on same
How to combine Celery with asyncio?
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With