Tracing tasks in celery 4.1.1 using sample code. Each worker runs:
import logging
from jaeger_client import Config
import opentracing
def get_tracer(service="Vienna"):
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name=service,
)
return config.initialize_tracer() or opentracing.global_tracer()
When I first start celery and run tasks each worker gets a working tracer and there is a log output for each of:
[2019-07-04 19:17:00,527: INFO/ForkPoolWorker-2] Initializing Jaeger Tracer with UDP reporter
[2019-07-04 19:17:00,546: INFO/ForkPoolWorker-2] opentracing.tracer initialized to <jaeger_client.tracer.Tracer object at 0x7f804d079c10>[app_name=SocketIOTask]
Any task that runs after the initial gets the global tracer
from Config.initialze_tracer
(which returns None
) and a log warning Jaeger tracer already initialized, skipping
.
Watching tcpdump on the console shows that the UDP packets aren't being sent, I think I'm getting an uninitialized default tracer and it's using the noop reporter.
I've pored over the code in opentracing and jaeger_client and I can't find a canonical way around this.
from jaeger_client import Config
def get_tracer(service="Vienna"):
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name=service,
)
tracer = config.initialize_tracer()
if tracer is None:
Config._initialized = False
tracer = config.initialize_tracer()
return tracer
Celery forks multiple processes by default (--pool=prefork
). The problem is that jaeger_client performs that actual message sending in a separate thread. This thread is created before forking and not available in the subprocesses. As the result, logging from subprocesses doesn't work.
The most obvious solution is to use --pool=solo
, -pool=threads
etc. But in the case of CPU-bound tasks, we still need to use --pool=prefork
. In this case, we get another issue.
On the one side, jaeger_client is designed to be used as a singleton. It opens file descriptors but never closes them (even close()
is more flush than close). On the other side, we need to create a separate tracer for every celery process.
In order to solve the issue, I used the following workaround:
tracer = None
tracer_owner_pid = 0
def get_jaeger_tracer():
global tracer_owner_pid
global tracer
pid = os.getpid()
if tracer_owner_pid == pid:
return tracer
logging.getLogger('').handlers = []
logging.basicConfig(format='%(message)s', level=logging.DEBUG)
config = Config(
config={
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
},
service_name='foo',
)
tracer = config.new_tracer()
tracer_owner_pid = pid
return tracer
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With