I've got a python3 (3.4.5) project I've been working on that utilizes multiprocessing.Pool to run around 50+ jobs through 4 workers. I have a separate process setup with a logging.handlers.QueueListener so I can log global stuff to a single file via a Queue used with multiprocessing.Manager(). So basically the flow goes like this
Queue via multiprocessing.Manager()QueueListener listening to the Queue I just created for the global log. (I've also tried this just using a thread off of the main program with the same results.)multiprocessing.Pool to process the individual jobs, passing them the Queue created previously and the necessary config info to run and setup their logging (there's a global log, plus an individual log for each job with more granular info). The jobs are started with map_async.I keep getting an intermittent error on some of the jobs though, usually there is an error on 1 of the jobs (a different one each time), occasionally there are 2 of the same errors or zero though. As far as I can tell, it's not the code in the jobs that's causing the error, but something in either the multiprocessing or logging setup. Here's an example of the error I'm getting:
--- Logging error ---
Traceback (most recent call last):
File "/usr/lib64/python3.4/logging/handlers.py", line 1347, in emit
self.enqueue(self.prepare(record))
File "/usr/lib64/python3.4/logging/handlers.py", line 1313, in enqueue
self.queue.put_nowait(record)
File "<string>", line 2, in put_nowait
File "/usr/lib64/python3.4/multiprocessing/managers.py", line 731, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 413, in _send_bytes
self._send(chunk)
File "/usr/lib64/python3.4/multiprocessing/connection.py", line 369, in _send
n = write(self._handle, buf)
TypeError: an integer is required (got type NoneType)
Call stack:
File "./sampling__test__py.py", line 100, in <module>
run_pool = multiprocessing.Pool(4)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 118, in Pool
context=self.get_context())
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 168, in __init__
self._repopulate_pool()
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 233, in _repopulate_pool
w.start()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
File "/usr/lib64/python3.4/multiprocessing/context.py", line 267, in _Popen
return Popen(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 21, in __init__
self._launch(process_obj)
File "/usr/lib64/python3.4/multiprocessing/popen_fork.py", line 77, in _launch
code = process_obj._bootstrap()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 254, in _bootstrap
self.run()
File "/usr/lib64/python3.4/multiprocessing/process.py", line 93, in run
self._target(*self._args, **self._kwargs)
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib64/python3.4/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "/home/username/value_check.py", line 338, in value_check
global_logger.info("SplitTime: {str_timeDelta} -- COMPLETED: {Check_Name} --- Total Txn Count: {var_Total_Txn_Count} --- Criteria Txn Count: {var_Criteria_Txn_Count} --- Threshold: {Threshold} --- Low_Vol Threshold: {LowVolThresh}".format(str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Message: 'SplitTime: 00:01:05,031 -- COMPLETED: ALPHA_CHECK --- Total Txn Count: 1234--- Criteria Txn Count: 0 --- Threshold: 10 --- Low_Vol Threshold: 0'
Arguments: None
The error in the code refers back to a logging object in my code, but even when I put try/except logic around the call it doesn't do anything, the error appears to be happening upstream. I've also tried changing what is being logged from a formatted string to just a simple string to no avail. It seems like somewhere along the way the individual jobs are either losing their connection to the Queue or something in the Queue is failing and causing the problems.
Any ideas? I've been working to get a newer version of Python available which would be beneficial for a number of reasons (f-strings in particular), but I don't know if that'd resolve this issue and I'm running out of troubleshooting ideas.
Even when I put try/except logic around the call it doesn't do anything.
That's likely because, if the logging package encounters an exception that has to do with logging itself, it will print the traceback, but not raise the exception itself. This is more fully explained in the docstring for logging.Handler.handleError.
One place to start is by setting:
logging.raiseExceptions = True
If the module-level attribute raiseExceptions is False, exceptions get silently ignored.
If that doesn't help, you could put an import pdb; pdb.set_trace() call in the code for .emit(); something like:
def emit(self, record):
try:
msg = self.format(record)
stream = self.stream
stream.write(msg)
stream.write(self.terminator)
self.flush()
except Exception as e:
import pdb; pdb.set_trace() # < ---
self.handleError(record)
Where record will be a LogRecord instance. Usually when I see a logging error pop up, it's because I've used the wrong number of args for the given format string, but inspect that record object should hopefully tell you more.
Lastly there is, from the call stack, your logging call itself:
global_logger.info(
"SplitTime: {str_timeDelta} -- "
"COMPLETED: {Check_Name} --- "
"Total Txn Count: {var_Total_Txn_Count} --- "
"Criteria Txn Count: {var_Criteria_Txn_Count} --- "
"Threshold: {Threshold} --- "
"Low_Vol Threshold: {LowVolThresh}".format(
str_timeDelta = timeDelta(datetime.now() - YAML_Config['start_time']), **YAML_Config))
Tough to say exactly what is ultimately raise the exception since the string does appear to be fully formatted. (Though we can't see YAML_Config.)
Regardless of that, one recommendation: you can take advantage of using logging's "lazy" string formatting rather than str.format() as you currently have. The str.format() call will get evaluated as soon as it can, whereas if you pass kwargs to global_logger.info(), the logging package will wait to evaluate them until it must.
I have just encountered similar issue with python3.10. I'm writing log messages to multiprocessing Queue and randomly was getting this:
self.queue.put(obj)
File "<string>", line 2, in put
File "/usr/local/lib/python3.10/multiprocessing/managers.py", line 817, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 411, in _send_bytes
self._send(header + buf)
File "/usr/local/lib/python3.10/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
TypeError: 'NoneType' object cannot be interpreted as an integer
It looks for me as some kind of race condition and have fixed it with lock:
with self.lock:
self.queue.put(msg)
Both lock and queue were created with manager:
self.pool = ProcessPoolExecutor(max_workers=1)
self.manager = multiprocessing.Manager()
self.queue = self.manager.Queue(-1)
self.lock = self.manager.Lock()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With