Using the code example provided by Twitter4j, I'd like t stop the stream after a list of 1,000 status have been collected, and return this list. How can I do that?
public class Stream {
public List<Status> execute throws TwitterException {
List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
if (statuses.size>1000){
//return statuses. Obviously that's not the correct place for a return statement...
}
}
public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:" + statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:" + numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId + " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
}
Consider using a BlockingQueue
as an intermediary, using the listener to add the Status
objects to it.
Once the stream has started you can start taking Status
es from the queue until you have the one thousand you need.
As a starting point, it would look something like the following:
public class Stream {
private static final int TOTAL_TWEETS = 1000;
public List<Status> execute() throws TwitterException {
// skipped for brevity...
// TODO: You may have to tweak the capacity of the queue, depends on the filter query
final BlockingQueue<Status> statuses = new LinkedBlockingQueue<Status>(10000);
final StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.offer(status); // Add received status to the queue
}
// etc...
};
final FilterQuery fq = new FilterQuery();
final String keywords[] = {"Keyword 1", "Keyword 2"};
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
// Collect the 1000 statues
final List<Status> collected = new ArrayList<Status>(TOTAL_TWEETS);
while (collected.size() < TOTAL_TWEETS) {
// TODO: Handle InterruptedException
final Status status = statuses.poll(10, TimeUnit.SECONDS);
if (status == null) {
// TODO: Consider hitting this too often could indicate no further Tweets
continue;
}
collected.add(status);
}
twitterStream.shutdown();
return collected;
}
}
It is not a good idea to force an asynchronous piece of code into synchronous mode. Please see https://stackoverflow.com/a/5934656/276263 See if you can rework your logic.
However, the below code works as per your requirement.
public class Stream {
public static void main(String[] args) throws TwitterException {
Stream stream = new Stream();
stream.execute();
}
private final Object lock = new Object();
public List<Status> execute() throws TwitterException {
final List<Status> statuses = new ArrayList();
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.setDebugEnabled(true);
cb.setOAuthConsumerKey("bbb");
cb.setOAuthConsumerSecret("bbb");
cb.setOAuthAccessToken("bbb");
cb.setOAuthAccessTokenSecret("bbb");
TwitterStream twitterStream = new TwitterStreamFactory(cb.build())
.getInstance();
StatusListener listener = new StatusListener() {
public void onStatus(Status status) {
statuses.add(status);
System.out.println(statuses.size() + ":" + status.getText());
if (statuses.size() > 100) {
synchronized (lock) {
lock.notify();
}
System.out.println("unlocked");
}
}
public void onDeletionNotice(
StatusDeletionNotice statusDeletionNotice) {
System.out.println("Got a status deletion notice id:"
+ statusDeletionNotice.getStatusId());
}
public void onTrackLimitationNotice(int numberOfLimitedStatuses) {
System.out.println("Got track limitation notice:"
+ numberOfLimitedStatuses);
}
public void onScrubGeo(long userId, long upToStatusId) {
System.out.println("Got scrub_geo event userId:" + userId
+ " upToStatusId:" + upToStatusId);
}
public void onException(Exception ex) {
ex.printStackTrace();
}
@Override
public void onStallWarning(StallWarning sw) {
System.out.println(sw.getMessage());
}
};
FilterQuery fq = new FilterQuery();
String keywords[] = { "federer", "nadal", "#Salute" };
fq.track(keywords);
twitterStream.addListener(listener);
twitterStream.filter(fq);
try {
synchronized (lock) {
lock.wait();
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("returning statuses");
twitterStream.shutdown();
return statuses;
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With