I am using threading and Queue to fetch url and store to database.
I just want one thread to do storing job.
so I write code as below:
import threading
import time
import Queue
site_count = 10
fetch_thread_count = 2
site_queue = Queue.Queue()
proxy_array=[]
class FetchThread(threading.Thread):
def __init__(self,site_queue,proxy_array):
threading.Thread.__init__(self)
self.site_queue = site_queue
self.proxy_array = proxy_array
def run(self):
while True:
index = self.site_queue.get()
self.get_proxy_one_website(index)
self.site_queue.task_done()
def get_proxy_one_website(self,index):
print '{0} fetched site :{1}\n'.format(self.name,index)
self.proxy_array.append(index)
def save():
while True:
if site_queue.qsize() > 0:
if len(proxy_array) > 10:
print 'save :{0} to database\n'.format(proxy_array.pop())
else:
time.sleep(1)
elif len(proxy_array) > 0:
print 'save :{0} to database\n'.format(proxy_array.pop())
elif len(proxy_array) == 0:
print 'break'
break
else:
print 'continue'
continue
def start_crawl():
global site_count,fetch_thread_count,site_queue,proxy_array
print 'init'
for i in range(fetch_thread_count):
ft = FetchThread(site_queue,proxy_array)
ft.setDaemon(True)
ft.start()
print 'put site_queue'
for i in range(site_count):
site_queue.put(i)
save()
print 'start site_queue join'
site_queue.join()
print 'finish'
start_crawl()
excuted output:
init
put site_queue
Thread-1 fetched site :0
Thread-2 fetched site :1
Thread-1 fetched site :2
Thread-2 fetched site :3
Thread-1 fetched site :4
Thread-2 fetched site :5
Thread-1 fetched site :6
Thread-2 fetched site :7
Thread-1 fetched site :8
Thread-2 fetched site :9
save :9 to database
save :8 to database
save :7 to database
save :6 to database
save :5 to database
save :4 to database
save :3 to database
save :2 to database
save :1 to database
save :0 to database
break
start site_queue join
finish
[Finished in 1.2s]
Why save()
function run after site_queue.join()
which written after save()
.
I also have substituted save()
with a thread function ,but it doesn't work too.
Does it mean I must change proxy_array=[]
to proxy_queue=Queue.Queue()
,then I can use theading to store data?
I just want one thead to do this,and there is not any other theads would get data from proxy_array
, why should I join it?Using Queue seems very weird.
Is there any better solusion?
UPDATE:
I don't want to wait until all the FetchThreads complete their work.I want to save data while fethcing,it would be much faster.
I want the result be something like below(Becuase I use array.pop(),so save 0 maybe appear very later,this is just a example for easily understand. ):
Thread-2 fetched site :1
Thread-1 fetched site :2
save :0 to database
Thread-2 fetched site :3
Thread-1 fetched site :4
save :2 to database
save :3 to database
Thread-2 fetched site :5
Thread-1 fetched site :6
save :4 to database
.......
UPDATE2 for someone has same question as below:
question:
As I saying as above context,there is not any other theads would get data from proxy_array.
I just can not imagine why it would break thread-safe?
answer:
producer-consumer problem in misha's answer, I understand after reading it carefully.
question:
And one more asking,if the Program main thread can play as comsumer with FetchThreads (in another word,needn't create StoreThread)
this is what I cannot figure out,I would update after finded the answer.
I have to come up with something similar producer-consumer. Producer generates an 'id' and consumer consumes that id to do some url fetch and processing it to it. Here is my skeleton code which does that
import Queue
import random
import threading
import time
import sys
data_queue = Queue.Queue()
lock = threading.Lock()
def gcd(a, b):
while b != 0:
a,b = b, a%b
return b
def consumer(idnum):
while True:
try:
data = data_queue.get(block=False)
except Exception, e:
print 'Exception ' + str(e)
else:
with lock:
print('\t consumer %d: computed gcd(%d, %d) = %d' %(idnum, data[0], data[1], gcd(data[0], data[1])))
time.sleep(1)
data_queue.task_done()
def producer(idnum, count):
for i in range(count):
a,b = random.randint(1, sys.maxint), random.randint(1, sys.maxint)
with lock:
print('\t producer %d: generated (%d, %d)'% (idnum, a, b))
data_queue.put((a,b))
time.sleep(0.5)
if __name__ == '__main__':
num_producers = 1
num_consumers = 2
num_integer_pairs = 10
for i in range(num_consumers):
t = threading.Thread(target=consumer, args=(i,))
t.daemon = True
t.start()
threads = []
for ii in range(num_producers):
thread = threading.Thread(target=producer, args=(ii, num_integer_pairs))
threads.append(thread)
thread.start()
# wait for the producers threads to finish
for thread in threads:
thread.join()
print 'done with producer threads'
# wait till all the jobs are done in the queue
data_queue.join()
with lock:
print 'all consumer threads finished'
with lock:
print 'main thread exited'
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With