I am using tweepy to handle a large twitter stream (following 4,000+ accounts). The more accounts that I add to the stream, the more likely I am to get this error:
Traceback (most recent call last):
File "myscript.py", line 2103, in <module>
main()
File "myscript.py", line 2091, in main
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))
Obviously that is a thick firehose - empirically obviously, it's too thick to handle. Based on researching this error on stackoverflow as well as the empirical trend that 'the more accounts to follow I add, the faster this exception occurs', my hypothesis is that this is 'my fault'. My processing of each tweet takes too long and/or my firehose is too thick. I get that.
But notwithstanding that setup, I still have two questions that I can't seem to find solid answers for.
1. Is there a way to simply 'handle' this exception, accept that I will miss some tweets, but keep the script running? I figure maybe it misses a tweet (or many tweets', but if I can live without 100% of the tweets I want, then the script/stream can still go on, ready to catch the next tweet whenever it can.
I've tried this exception handling, which was recommended for that in a similar question on stackoverflow: from urllib3.exceptions import ProtocolError
while True:
try:
twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)
except ProtocolError:
continue
But unfortunately for me, (perhaps I implemented it incorrectly, but I don't think I did), that did not work. I get the same exact error I was previously getting with or without that recommended exception handling code in place.
Could I have the tweets sort of written - in the raw - pre-processing - to memory, or a database, or something, on one thread? And then, have a second thread ready to do the processing of those tweets, as soon as it's ready? I figure that way, at least, it takes my post-processing of the tweet out of the equation as a limiting factor on the bandwidth of the firehose I am reading. Then if I still get the error I can cut back on who I am following, etc.
I have watched some threading tutorials but figured might be worth asking if that 'works' with ... this tweepy/twitter/etc/ complex. I am not confident in my understanding of the problem I have or how threading might help, so figured I could ask for advice as to if indeed that would help me here.
If this idea is valid, is there a sort of simple piece of example code someone could help me with to point me in the right direction?
I think i solved this problem by finally completing my first queue/thread implementation. I am not learned enough to know the best way to do this, but I think this way does work. Using the below code I now build up a queue of new tweets and can handle them as I wish in the queue, rather than falling behind and losing my connection with tweepy.
from Queue import Queue
from threading import Thread
class My_Parser(tweepy.StreamListener):
def __init__(self, q = Queue()):
num_worker_threads = 4
self.q = q
for i in range(num_worker_threads):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_data(self, data):
self.q.put(data)
def do_stuff(self):
while True:
do_whatever(self.q.get())
self.q.task_done()
I did continue digging for a while about the IncompleteRead error and I tried numerous more Exception handlings solutions using url libs and http libs but I struggled with that. And I think there may be some benefits to the queueing stuff anyway outside of just keeping the connection (for one, won't lose data).
Hopefully this is helpful to someone. haha.
Thank you a lot man, I was facing a problem like this and tried all kinds of solutions. This was happening because, beside streaming from the API, I was doing a lot of processing with the data and this made me lose connection. I just made some adjustments in the way you did, i had to add super().init() in the init method because I'am using on_status
and the import Queue must be lower case.
Another thing, I didn't make the do_whatever
, I just put self.q.get()
inside the while.
Anyway, works perfectly, thanks a lot again.
The final code:
from queue import Queue
from threading import Thread
class Listener(tweepy.StreamListener):
def __init__(self, q = Queue()):
super().__init__()
self.q = q
for i in range(4):
t = Thread(target=self.do_stuff)
t.daemon = True
t.start()
def on_status(self, status):
<my code here>
def do_stuff(self):
while True:
self.q.get()
self.q.task_done()
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With