I am trying to use tensorflow in a celery worker. I encountered timeout rather than receiving response from the worker.
I used following code:
tasks.py
from celery import Celery
from celery.signals import worker_init
import tensorflow as tf
app = Celery('tasks', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0')
class TFModel():
def __init__(self):
self.sess = tf.Session()
def run(self):
return self.sess.run(tf.constant('hello'))
tf_model = None
@worker_init.connect
def on_worker_init(**_):
global tf_model
tf_model = TFModel()
print(tf_model.run())
return
@app.task(time_limit=10)
def run():
return tf_model.run()
test.py
import time
from tasks import run
r=run.delay()
while not r.ready():
time.sleep(2)
print(r.get())
I executed a worker with this command.
$ celery -A tasks worker -l info -c 1
When I executed the worker, hello
was printed out, since on_worker_init()
had print(tf_model.run())
.
This means that tensorflow works properly.
Then, I ran:
$ python test.py
Then, I got:
celery.backends.base.TimeLimitExceeded: TimeLimitExceeded(10,)
What was wrong? How can I investigate what happened?
My environment is:
python 3.5.1
tensorflow 0.11.0
celery 4.0.2
Thanks.
Try this:
import tensorflow as tf
from celery import Celery
from celery.utils.log import get_task_logger
from celery.signals import worker_init, worker_process_init
from models import Network, Extractor
from celery.concurrency import asynpool
asynpool.PROC_ALIVE_TIMEOUT = 100.0 #set this long enough
logger = get_task_logger(__name__)
CELERY_BROKER_URL = 'redis://localhost:6379/'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/'
# Celery: Distributed Task Queue
app = Celery('tasks', backend=CELERY_RESULT_BACKEND, broker=CELERY_BROKER_URL)
app.conf.task_serializer = 'json'
app.conf.result_serializer = 'json'
tf_model = None
@worker_process_init.connect()
def on_worker_init(**_):
global tf_model
# Create server with model
logger.info('model for worker: started init')
print("model for dsa")
session = tf.Session()
model = Network(session, True)
#model.load_model('./models/test_2')
extractor = Extractor(model)
tf_model = extractor
logger.info('model for worker: initialized')
@app.task(name='process_single')
def process_single(image):
logger.info('process_single: started')
descriptor = tf_model.process_single(image)
logger.info('process_single: completed')
return descriptor
It looks for me that this is working:
[2017-01-21 09:41:18,892: INFO/Worker-1] ???[???]: model for worker: started init
[2017-01-21 09:41:18,893: WARNING/Worker-1] model for dsa
[2017-01-21 09:41:18,902: INFO/MainProcess] Connected to redis://localhost:6379//
[2017-01-21 09:41:18,915: INFO/MainProcess] mingle: searching for neighbors
[2017-01-21 09:41:19,920: INFO/MainProcess] mingle: all alone
[2017-01-21 09:41:19,949: WARNING/MainProcess] celery@cospelpc ready.
[2017-01-21 09:41:20,930: INFO/Worker-1] ???[???]: model for worker: initialized
[2017-01-21 09:41:31,648: INFO/MainProcess] Received task: process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]
[2017-01-21 09:41:31,658: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: started
[2017-01-21 09:41:33,125: INFO/Worker-1] process_single[024068ba-9ea2-4405-8aab-d3504a06aa55]: process_single: completed
[2017-01-21 09:41:33,128: INFO/MainProcess] Task process_single[024068ba-9ea2-4405-8aab-d3504a06aa55] succeeded in 1.470330449s: [153608.4375, 0.0, 0.0, 243285.75, 0.0, 155679.671875, 346120.625, 70663.265625, 0.0, 29445.03125, 0.0, 518396.25, 0.0,...
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With