I am using multiprocessing.Pool()
here is what i want to Pool:
def insert_and_process(file_to_process,db):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,file_list,db) # here having problem.
I want to pass 2 arguments What i want to do is to initialize only 4 DB connections (here will try to create connection on every function call so possibly millions of them and cause IO Freezed to death) . if i can create 4 db connections and 1 for each processes it will be ok.
Is there any solution for Pool ? or should i abandon it ?
EDIT:
From help of both of you i got this by doing this:
args=zip(f,cycle(dbs))
Out[-]:
[('f1', 'db1'),
('f2', 'db2'),
('f3', 'db3'),
('f4', 'db4'),
('f5', 'db1'),
('f6', 'db2'),
('f7', 'db3'),
('f8', 'db4'),
('f9', 'db1'),
('f10', 'db2'),
('f11', 'db3'),
('f12', 'db4')]
So here it how it gonna work , i gonna move DB connection code out to the main level and do this:
def process_and_insert(args):
#Table Definations
args[1].table.insert(**parse_file(args[0]))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
dbs = [DAL("path_to_mysql/database") for i in range(0,3)]
args=zip(file_list,cycle(dbs))
P.map(insert_and_process,args) # here having problem.
Yeah , i going to test it out and let you guys know.
map function only supports mapping functions that have a single argument. This means that if you want to map over a function which expects multiple arguments you can't use it.
The multiprocessing. pool. Pool process pool provides a version of the map() function where the target function is called for each item in the provided iterable in parallel. A parallel equivalent of the map() built-in function […]. It blocks until the result is ready.
Passing Keyword Arguments to Multiprocessing Processes We can also pass in arguments corresponding to the parameter name using the kwargs parameter in the Process class. Instead of passing a tuple, we pass a dictionary to kwargs where we specify the argument name and the variable being passed in as that argument.
The Pool
documentation does not say of a way of passing more than one parameter to the target function - I've tried just passing a sequence, but does not get unfolded (one item of the sequence for each parameter).
However, you can write your target function to expect the first (and only) parameter to be a tuple, in which each element is one of the parameters you are expecting:
from itertools import repeat
def insert_and_process((file_to_process,db)):
db = DAL("path_to_mysql" + db)
#Table Definations
db.table.insert(**parse_file(file_to_process))
return True
if __name__=="__main__":
file_list=os.listdir(".")
P = Pool(processes=4)
P.map(insert_and_process,zip(file_list,repeat(db)))
(note the extra parentheses in the definition of insert_and_process
- python treat that as a single parameter that should be a 2-item sequence. The first element of the sequence is attributed to the first variable, and the other to the second)
Your pool will spawn four processes, each run by it's own instance of the Python interpreter. You can use a global variable to hold your database connection object, so that exactly one connection is created per process:
global_db = None
def insert_and_process(file_to_process, db):
global global_db
if global_db is None:
# If this is the first time this function is called within this
# process, create a new connection. Otherwise, the global variable
# already holds a connection established by a former call.
global_db = DAL("path_to_mysql" + db)
global_db.table.insert(**parse_file(file_to_process))
return True
Since Pool.map()
and friends only support one-argument worker functions, you need to create a wrapper that forwards the work:
def insert_and_process_helper(args):
return insert_and_process(*args)
if __name__ == "__main__":
file_list=os.listdir(".")
db = "wherever you get your db"
# Create argument tuples for each function call:
jobs = [(file, db) for file in file_list]
P = Pool(processes=4)
P.map(insert_and_process_helper, jobs)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With