Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

stop the Twitter stream and return List of status with twitter4j

Using the code example provided by Twitter4j, I'd like t stop the stream after a list of 1,000 status have been collected, and return this list. How can I do that?

public class Stream {
    public List<Status> execute throws TwitterException {

    List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();

    StatusListener listener = new StatusListener() {

        public void onStatus(Status status) {
            statuses.add(status);
            if (statuses.size>1000){
              //return statuses. Obviously that's not the correct place for a return statement...
            }
        }

        public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
            System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
        }

        public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
            System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
        }

        public void onScrubGeo(long userId, long upToStatusId) {
            System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
        }

        public void onException(Exception ex) {
            ex.printStackTrace();
        }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = {"Keyword 1", "Keyword 2"};

    fq.track(keywords);

    twitterStream.addListener(listener);
    twitterStream.filter(fq);

}
like image 669
seinecle Avatar asked Aug 02 '13 12:08

seinecle


2 Answers

Consider using a BlockingQueue as an intermediary, using the listener to add the Status objects to it.

Once the stream has started you can start taking Statuses from the queue until you have the one thousand you need.

As a starting point, it would look something like the following:

public class Stream {
    private static final int TOTAL_TWEETS = 1000;

    public List<Status> execute() throws TwitterException {
        // skipped for brevity...

        // TODO: You may have to tweak the capacity of the queue, depends on the filter query
        final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000); 
        final StatusListener listener = new StatusListener() {

            public void onStatus(Status status) {
                statuses.offer(status); // Add received status to the queue
            }

            // etc...
        };

        final FilterQuery fq = new FilterQuery();
        final String keywords[] = {"Keyword 1", "Keyword 2"};
        fq.track(keywords);

        twitterStream.addListener(listener);
        twitterStream.filter(fq);

        // Collect the 1000 statues
        final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
        while (collected.size() < TOTAL_TWEETS) {
            // TODO: Handle InterruptedException
            final Status status = statuses.poll(10, TimeUnit.SECONDS); 

            if (status == null) {
                // TODO: Consider hitting this too often could indicate no further Tweets
                continue;
            }
            collected.add(status);
        }
        twitterStream.shutdown();

        return collected;
    }
} 
like image 154
Jonathan Avatar answered Nov 16 '22 09:11

Jonathan


It is not a good idea to force an asynchronous piece of code into synchronous mode. Please see https://stackoverflow.com/a/5934656/276263 See if you can rework your logic.

However, the below code works as per your requirement.

public class Stream {

  public static void main(String[] args) throws TwitterException {
    Stream stream = new Stream();
    stream.execute();
  }

  private final Object lock = new Object();
  public List<Status> execute() throws TwitterException {

    final List<Status> statuses = new ArrayList();

    ConfigurationBuilder cb = new ConfigurationBuilder();
    cb.setDebugEnabled(true);
    cb.setOAuthConsumerKey("bbb");
    cb.setOAuthConsumerSecret("bbb");
    cb.setOAuthAccessToken("bbb");
    cb.setOAuthAccessTokenSecret("bbb");

    TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
        .getInstance();

    StatusListener listener = new StatusListener() {

      public void onStatus(Status status) {
        statuses.add(status);
        System.out.println(statuses.size() + ":" + status.getText());
        if (statuses.size() > 100) {
          synchronized (lock) {
            lock.notify();
          }
          System.out.println("unlocked");
        }
      }

      public void onDeletionNotice(
          StatusDeletionNotice statusDeletionNotice) {
        System.out.println("Got a status deletion notice id:"
            + statusDeletionNotice.getStatusId());
      }

      public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
        System.out.println("Got track limitation notice:"
            + numberOfLimitedStatuses);
      }

      public void onScrubGeo(long userId, long upToStatusId) {
        System.out.println("Got scrub_geo event userId:" + userId
            + " upToStatusId:" + upToStatusId);
      }

      public void onException(Exception ex) {
        ex.printStackTrace();
      }

      @Override
      public void onStallWarning(StallWarning sw) {
        System.out.println(sw.getMessage());

      }
    };

    FilterQuery fq = new FilterQuery();
    String keywords[] = { "federer", "nadal", "#Salute" };

    fq.track(keywords);


    twitterStream.addListener(listener);
    twitterStream.filter(fq);

    try {
      synchronized (lock) {
        lock.wait();
      }
    } catch (InterruptedException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    System.out.println("returning statuses");
    twitterStream.shutdown();
    return statuses;
  }
}
like image 4
krishnakumarp Avatar answered Nov 16 '22 09:11

krishnakumarp