I'm working on a project where I need to retrieve Twitter messages using the TwitterAPI, process them and store them in a database. I am using Producer/Consumer BlockingQueue where the elements act as follows:
Here is the Main class:
// Creating shared object
BlockingQueue<TwitterMessage> sharedQueue = new ArrayBlockingQueue<TwitterMessage>(1);
// Creating Producer and Consumer Thread
Thread prodThread = new Thread(new TwitterStreamProducer(sharedQueue));
Thread consThread = new Thread(new TwitterStreamConsumer(sharedQueue));
// Starting producer and Consumer thread
prodThread.start();
consThread.start();
The producer processes the TwitterAPI response and adds the object to the queue.
@Override
public void run() {
while (true) {
try {
message = extractData(); // extract data from TwitterAPI response and return TwitterMessage object
sharedQueue.put(message);
System.out.println("Produced: " + message.getTwitterMessage());
} catch (Exception ex) {
Logger.getLogger(TwitterStreamProducer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
The consumer does as follows:
private final BlockingQueue<TwitterMessage> sharedQueue;
private TwitterProcessor twitterProcessor;
private TwitterMessage twitterMessage;
public TwitterStreamConsumer(BlockingQueue<TwitterMessage> sharedQueue) {
this.sharedQueue = sharedQueue;
twitterProcessor = new TwitterProcessor();
}
@Override
public void run() {
while (true) {
try {
twitterMessage = this.twitterProcessor.process(sharedQueue.take());
if (twitterMessage.getTwitterMessage().length() > 1) {
System.out.printf("Consumed: %s\n", twitterMessage.getTwitterMessage());
}
} catch (InterruptedException ex) {
Logger.getLogger(TwitterStreamConsumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
As I would expect to see is the following:
Produced: …twittermessage1…
Consumed: …twittermessage1…
Produced: …twittermessage2…
Consumed: …twittermessage2…
Produced: …twittermessage3…
Consumed: …twittermessage3…
...
However, the result I'm getting is the following:
Produced: …twittermessage1…
Produced: …twittermessage2… <= problem
Consumed: …twittermessage1…
Produced: …twittermessage3…
Consumed: …twittermessage3…
Consumed: …twittermessage3… <= problem
Produced: …twittermessage4… <= problem
Produced: …twittermessage5…
Consumed: …twittermessage5…
As you can see, sometimes there is an overlap between Producer and Consumer where the Producer produces messages too many that are not consumed. Also sometimes a message gets consumed twice (or sometimes even more than twice)
EDIT1 Here is what is printed out on the console:
Produced: @1StevenGeorgiou thanks for the follow #ff
Processed: follow
Produced: @nmagliozzi6 @_PatrickKealy_ but of course!!!!!
Produced: @taylorgaglia Thanks Tayl 😊 miss you tooo
Processed: tayl miss
Produced: Hate this who to follow tab in #twitter it's shows the most pathetic people you know. Accidently added one I had to act fast to unfollow
Processed: hate follow tabshow pathet peopl accid ad act fast unfollow
EDIT2 As John Vint suggested to print out the 'System.identityHashCode(sharedQueue.take())' I get the following:
Produced: …
Consumed: 1206857787
Produced: …
Consumed: 1206857787
…
Can someone help me out here how I can fix this?
Thanks!
The code is behaving as it should: the order of execution with threads is undefined. Therefore it is very possible and likely that the producers produce more than one message before the previous one is processed. This is even a desirable feature, as it allows to have several threads handling the fetching (producers), which will take some time as of blocking, and have less or even a single consumer actually processing those intermediate results.
But in your code you are violating this basic rule for Producers/Consumers, which is that there needs to be a difference in relationship for them. As you currently have a producer/consumer pair for each message, the pattern used is only slowing down things. You should either increase the number of fetchers (and accept that the processing is asynchronously), or - if you don't want the asynchronous processing - remove the pattern altogether and have the "consumers" fetch the message on their own.
Edit: If you use a concurrent queue like the LinkedBlockingQueue your problem should be solved.
Also have a look into the ExecutorService class, which simplifies threading with Runnables a lot.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With