I was doing load test, and want to send requests to server with a specific rate, so I use RateLimiter module to control request rate.
But it will raise error:
TypeError: can't pickle _thread.RLock objects
It seems to be because Queue object q was be used in worker() method:
q.put(pool.submit(predict, req))
How can I solve this problem?
my demo code:
from multiprocessing import Queue, Process, cpu_count
import time
from ratelimiter import RateLimiter # pip install ratelimiter
from concurrent import futures
q = Queue()
def predict(req):
# send a request and return the response
# ...
def worker(q, req, process_id):
pool = futures.ThreadPoolExecutor(max_workers = 1000)
rate_limiter = RateLimiter(max_calls=1000, period=1)
while time.time() < time_end:
with rate_limiter:
q.put(pool.submit(predict, req))
continue
processes = []
for i in range(num_processes):
p = Process(target=worker, args=(q, req, i)) # req is a request object
p.start()
processes.append(p)
The _thread.RLock object cannot be pickled. This is because it is a synchronization primitive that is used to protect access to shared resources in a multi-threaded environment.
Try to solve this issue by using a different synchronization primitive that can be pickled. Try using a multiprocessing.Lock object instead of an _thread.RLock.
Here is an example of how you can use multiprocessing.Lock:
from multiprocessing import Lock
lock = Lock()
def worker(q):
while True:
req = q.get()
if req is None:
break
with lock:
result = predict(req)
q.task_done()
q.put(result)
My example does this:
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With