Tracing tasks in celery 4.1.1 using sample code. Each worker runs:
import logging
from jaeger_client import Config
import opentracing 
def get_tracer(service="Vienna"):
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name=service,
    )
    return config.initialize_tracer() or opentracing.global_tracer()
When I first start celery and run tasks each worker gets a working tracer and there is a log output for each of:
[2019-07-04 19:17:00,527: INFO/ForkPoolWorker-2] Initializing Jaeger Tracer with UDP reporter
[2019-07-04 19:17:00,546: INFO/ForkPoolWorker-2] opentracing.tracer initialized to <jaeger_client.tracer.Tracer object at 0x7f804d079c10>[app_name=SocketIOTask]
Any task that runs after the initial gets the global tracer 
 from Config.initialze_tracer (which returns None) and a log warning Jaeger tracer already initialized, skipping. 
Watching tcpdump on the console shows that the UDP packets aren't being sent, I think I'm getting an uninitialized default tracer and it's using the noop reporter.
I've pored over the code in opentracing and jaeger_client and I can't find a canonical way around this.
from jaeger_client import Config
def get_tracer(service="Vienna"):
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name=service,
     )
    tracer = config.initialize_tracer()
    if tracer is None:
        Config._initialized = False
        tracer = config.initialize_tracer()
    return tracer
                        Celery forks multiple processes by default (--pool=prefork). The problem is that jaeger_client performs that actual message sending in a separate thread. This thread is created before forking and not available in the subprocesses. As the result, logging from subprocesses doesn't work.
The most obvious solution is to use --pool=solo, -pool=threads etc. But in the case of CPU-bound tasks, we still need to use --pool=prefork. In this case, we get another issue.
On the one side, jaeger_client is designed to be used as a singleton. It opens file descriptors but never closes them (even close() is more flush than close). On the other side, we need to create a separate tracer for every celery process.
In order to solve the issue, I used the following workaround:
tracer = None
tracer_owner_pid = 0
def get_jaeger_tracer():
    global tracer_owner_pid
    global tracer
    pid = os.getpid()
    if tracer_owner_pid == pid:
        return tracer
    logging.getLogger('').handlers = []
    logging.basicConfig(format='%(message)s', level=logging.DEBUG)
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name='foo',
    )
    tracer = config.new_tracer()
    tracer_owner_pid = pid
    return tracer
                        If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With