Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Tweepy Connection broken: IncompleteRead - best way to handle exception? or, can threading help avoid?

I am using tweepy to handle a large twitter stream (following 4,000+ accounts). The more accounts that I add to the stream, the more likely I am to get this error:

Traceback (most recent call last):
  File "myscript.py", line 2103, in <module>
main()
  File "myscript.py", line 2091, in main
    twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST,     stall_warnings=True)
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 445, in filter
self._start(async)
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 361, in _start
self._run()
  File "C:\Python27\lib\site-packages\tweepy\streaming.py", line 294, in _run
raise exception
requests.packages.urllib3.exceptions.ProtocolError: ('Connection broken:     IncompleteRead(0 bytes read, 2000 more expected)', IncompleteRead(0 bytes read, 2000 more expected))

Obviously that is a thick firehose - empirically obviously, it's too thick to handle. Based on researching this error on stackoverflow as well as the empirical trend that 'the more accounts to follow I add, the faster this exception occurs', my hypothesis is that this is 'my fault'. My processing of each tweet takes too long and/or my firehose is too thick. I get that.

But notwithstanding that setup, I still have two questions that I can't seem to find solid answers for.
1. Is there a way to simply 'handle' this exception, accept that I will miss some tweets, but keep the script running? I figure maybe it misses a tweet (or many tweets', but if I can live without 100% of the tweets I want, then the script/stream can still go on, ready to catch the next tweet whenever it can.

I've tried this exception handling, which was recommended for that in a similar question on stackoverflow: from urllib3.exceptions import ProtocolError

    while True:
        try:
            twitter_stream.filter(follow=USERS_TO_FOLLOW_STRING_LIST, stall_warnings=True)

        except ProtocolError:
            continue

But unfortunately for me, (perhaps I implemented it incorrectly, but I don't think I did), that did not work. I get the same exact error I was previously getting with or without that recommended exception handling code in place.

  1. I have never implemented queues and/or threading in my python code. Would this be a good time for me to try to implement that? I don't know everything about queues/threading, but I am imagining...

Could I have the tweets sort of written - in the raw - pre-processing - to memory, or a database, or something, on one thread? And then, have a second thread ready to do the processing of those tweets, as soon as it's ready? I figure that way, at least, it takes my post-processing of the tweet out of the equation as a limiting factor on the bandwidth of the firehose I am reading. Then if I still get the error I can cut back on who I am following, etc.

I have watched some threading tutorials but figured might be worth asking if that 'works' with ... this tweepy/twitter/etc/ complex. I am not confident in my understanding of the problem I have or how threading might help, so figured I could ask for advice as to if indeed that would help me here.

If this idea is valid, is there a sort of simple piece of example code someone could help me with to point me in the right direction?

like image 290
10mjg Avatar asked Dec 30 '17 14:12

10mjg


2 Answers

I think i solved this problem by finally completing my first queue/thread implementation. I am not learned enough to know the best way to do this, but I think this way does work. Using the below code I now build up a queue of new tweets and can handle them as I wish in the queue, rather than falling behind and losing my connection with tweepy.

from Queue import Queue
from threading import Thread 

class My_Parser(tweepy.StreamListener):

    def __init__(self, q = Queue()):

        num_worker_threads = 4
        self.q = q
        for i in range(num_worker_threads):
             t = Thread(target=self.do_stuff)
             t.daemon = True
             t.start()

    def on_data(self, data):

        self.q.put(data)


    def do_stuff(self):
        while True:

            do_whatever(self.q.get())


            self.q.task_done()

I did continue digging for a while about the IncompleteRead error and I tried numerous more Exception handlings solutions using url libs and http libs but I struggled with that. And I think there may be some benefits to the queueing stuff anyway outside of just keeping the connection (for one, won't lose data).

Hopefully this is helpful to someone. haha.

like image 93
10mjg Avatar answered Oct 28 '22 14:10

10mjg


Thank you a lot man, I was facing a problem like this and tried all kinds of solutions. This was happening because, beside streaming from the API, I was doing a lot of processing with the data and this made me lose connection. I just made some adjustments in the way you did, i had to add super().init() in the init method because I'am using on_status and the import Queue must be lower case. Another thing, I didn't make the do_whatever, I just put self.q.get() inside the while. Anyway, works perfectly, thanks a lot again.

The final code:

from queue import Queue
from threading import Thread

class Listener(tweepy.StreamListener):

    def __init__(self, q = Queue()):
        super().__init__()
        self.q = q
        for i in range(4):
            t = Thread(target=self.do_stuff)
            t.daemon = True
            t.start()

    def on_status(self, status):
        <my code here>

    def do_stuff(self):
        while True:
            self.q.get()
            self.q.task_done()
like image 26
ewertonpaulo Avatar answered Oct 28 '22 15:10

ewertonpaulo