Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Celery: The module was not found

Tags:

python

celery

I am using Open Semantic Search (OSS) and I would like to monitor its processes using the Flower tool. The workers that Celery needs should be given as OSS states on its website

The workers will do tasks like analysis and indexing of the queued files. The workers are implemented by etl/tasks.py and will be started automatically on boot by the service opensemanticsearch.

This tasks.py file looks as follows:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

#
# Queue tasks for batch processing and parallel processing
#

# Queue handler
from celery import Celery

# ETL connectors
from etl import ETL
from etl_delete import Delete
from etl_file import Connector_File
from etl_web import Connector_Web
from etl_rss import Connector_RSS


verbose = True
quiet = False

app = Celery('etl.tasks')
app.conf.CELERYD_MAX_TASKS_PER_CHILD = 1

etl_delete = Delete()
etl_web = Connector_Web()
etl_rss = Connector_RSS()


#
# Delete document with URI from index
#

@app.task(name='etl.delete')
def delete(uri):
    etl_delete.delete(uri=uri)


#
# Index a file
#

@app.task(name='etl.index_file')
def index_file(filename, wait=0, config=None):

    if wait:
        time.sleep(wait)

    etl_file = Connector_File()

    if config:
        etl_file.config = config

    etl_file.index(filename=filename)

#
# Index file directory
#

@app.task(name='etl.index_filedirectory')
def index_filedirectory(filename):

    from etl_filedirectory import Connector_Filedirectory

    connector_filedirectory = Connector_Filedirectory()

    result = connector_filedirectory.index(filename)

    return result


#
# Index a webpage
#
@app.task(name='etl.index_web')
def index_web(uri, wait=0, downloaded_file=False, downloaded_headers=[]):

    if wait:
        time.sleep(wait)

    result = etl_web.index(uri, downloaded_file=downloaded_file, downloaded_headers=downloaded_headers)

    return result


#
# Index full website
#

@app.task(name='etl.index_web_crawl')
def index_web_crawl(uri, crawler_type="PATH"):

    import etl_web_crawl

    result = etl_web_crawl.index(uri, crawler_type)

    return result


#
# Index webpages from sitemap
#

@app.task(name='etl.index_sitemap')
def index_sitemap(uri):

    from etl_sitemap import Connector_Sitemap

    connector_sitemap = Connector_Sitemap()

    result = connector_sitemap.index(uri)

    return result


#
# Index RSS Feed
#

@app.task(name='etl.index_rss')
def index_rss(uri):

    result = etl_rss.index(uri)

    return result


#
# Enrich with / run plugins
#

@app.task(name='etl.enrich')
def enrich(plugins, uri, wait=0):

    if wait:
        time.sleep(wait)

    etl = ETL()
    etl.read_configfile('/etc/opensemanticsearch/etl')
    etl.read_configfile('/etc/opensemanticsearch/enhancer-rdf')

    etl.config['plugins'] = plugins.split(',')

    filename = uri

    # if exist delete protocoll prefix file://
    if filename.startswith("file://"):
        filename = filename.replace("file://", '', 1)

    parameters = etl.config.copy()

    parameters['id'] = uri
    parameters['filename'] = filename

    parameters, data = etl.process (parameters=parameters, data={})

    return data


#
# Read command line arguments and start
#

#if running (not imported to use its functions), run main function
if __name__ == "__main__":

    from optparse import OptionParser 

    parser = OptionParser("etl-tasks [options]")
    parser.add_option("-q", "--quiet", dest="quiet", action="store_true", default=False, help="Don\'t print status (filenames) while indexing")
    parser.add_option("-v", "--verbose", dest="verbose", action="store_true", default=False, help="Print debug messages")

    (options, args) = parser.parse_args()

    if options.verbose == False or options.verbose==True:
        verbose = options.verbose
        etl_delete.verbose = options.verbose
        etl_web.verbose = options.verbose
        etl_rss.verbose = options.verbose

    if options.quiet == False or options.quiet==True:
        quiet = options.quiet

    app.worker_main()

I read multiple tutorials about Celery and from my understanding, this line should do the job

celery -A etl.tasks flower

but it doesnt. The result is the statement

Error: Unable to load celery application. The module etl was not found.

Same for

celery -A etl.tasks worker --loglevel=debug

so Celery itself seems to be causing the trouble, not flower. I also tried e.g. celery -A etl.index_filedirectory worker --loglevel=debug but with the same result.

What am I missing? Do I have to somehow tell Celery where to find etl.tasks? Online research doesn't really show a similar case, most of the "Module not found" errors seem to occur while importing stuff. So possibly it's a silly question but I couldn't find a solution anywhere. I hope you guys can help me. Unfortunately, I won't be able to respond until Monday though, sorry in advance.

like image 489
starsnpixel Avatar asked May 02 '19 14:05

starsnpixel


Video Answer


2 Answers

Solutions above did not work for me.

I had the same issue and my problem was that in main celery.py (that was in SmartCalend folder) I had:

app = Celery('proj')

but instead I must type there:

app = Celery('SmartCalend')

where SmartCalend is the actual app name where celery.py belongs (!). not any random word, but precisely app name. Thats nowhere mentioned, only in official docs here:

enter image description here

like image 159
Artem Chege Avatar answered Oct 09 '22 16:10

Artem Chege


I got same issue, I installed and configured my queue as follows, and it works.

Install RabbitMQ

MacOS

brew install rabbitmq
sudo vim ~/.bash_profile

In bash_profile add the following line:

PATH=$PATH:/usr/local/sbin

Then update bash_profile:

sudo source ~/.bash_profile

Linux

sudo apt-get install rabbitmq-server

Configure RabbitMQ

Launch the queue:

sudo rabbitmq-server

In another Terminal, configure the queue:

sudo rabbitmqctl add_user myuser mypassword
sudo rabbitmqctl add_vhost myvhost
sudo rabbitmqctl set_user_tags myuser mytag
sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"

Launch Celery

I would suggest to go in the folder that contains task.py and use the following command:

celery -A task worker -l info -Q celery --concurrency 5

like image 43
sashaboulouds Avatar answered Oct 09 '22 16:10

sashaboulouds