I am using pymongo 3.2
, I want to use it in multiporcess:
client = MongoClient(JD_SEARCH_MONGO_URI, connect=False)
db = client.jd_search
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
for jd in db['sample_data'].find():
jdId = jd["jdId"]
for cv in db["sample_data"].find():
itemId = cv["itemId"]
executor.submit(intersect_compute, jdId, itemId)
# print "done {} => {}".format(jdId, itemId)
but I get error:
UserWarning: MongoClient opened before fork. Create MongoClient with connect=False, or create client after forking. See PyMongo's documentation for details: http://api.mongodb.org/python/current/faq.html#using-pymongo-with-multiprocessing>
according to the documentation, I have set connect
to False
as you can see
The Python PyMongo MongoClient class allows Developers to make connections to MongoDB in development with the help of client instances. The use of the PyMongo driver with MongoClient class makes it easier to code and connect to MongoDB easily and quickly.
The only async framework that PyMongo fully supports is Gevent. Currently there is no great way to use PyMongo in conjunction with Tornado or Twisted. PyMongo provides built-in connection pooling, so some of the benefits of those frameworks can be achieved just by writing multi-threaded code that shares a Connection.
Connect to a Standalone MongoDB InstanceMongoClient mongoClient = new MongoClient(); You can explicitly specify the hostname to connect to a MongoDB instance running on the specified host on port 27017 : MongoClient mongoClient = new MongoClient( "host1" );
How does connection pooling work in PyMongo? ¶ Every MongoClient instance has a built-in connection pool per server in your MongoDB topology. These pools open sockets on demand to support the number of concurrent MongoDB operations that your multi-threaded application requires.
You did exactly like in the documentation (URL in exception), but in the section Never do this
.
p.s. I updated your code sample at the end of the comment.
# Each process creates its own instance of MongoClient.
def func():
db = pymongo.MongoClient().mydb
# Do something with db.
proc = multiprocessing.Process(target=func)
proc.start()
client = pymongo.MongoClient()
# Each child process attempts to copy a global MongoClient
# created in the parent process. Never do this.
def func():
db = client.mydb
# Do something with db.
proc = multiprocessing.Process(target=func)
proc.start()
What you need to change is to move database connection initialization to a fork of each process. As each of them will have own independent connection.
with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor:
client = MongoClient(JD_SEARCH_MONGO_URI, connect=False)
db = client.jd_search
for jd in db['sample_data'].find():
jdId = jd["jdId"]
for cv in db["sample_data"].find():
itemId = cv["itemId"]
executor.submit(intersect_compute, jdId, itemId)
# print "done {} => {}".format(jdId, itemId)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With