Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

SubmissionPublisher on submit not invoking onNext of subscriber

Every interval I retrieve tweets with a certain query. These tweets have to be passed to services which calculate and manipulate those tweets. So these services are subscribed to my publisher. So publisher.hasSubscribers() returns true. But the submit or offer function does not invoke the onNext of my subscriber. So as a "fix", I cycle through my subscribers and invoke it myself. But that shouldn't be the case.

This is the constructor of my publisher.

 public TwitterStreamer(Executor executor, int maxBufferCapacity, long period, TimeUnit unit, String searchQuery){
    super(executor, maxBufferCapacity);
    this.searchQuery = searchQuery;
    scheduler = new ScheduledThreadPoolExecutor(1);
    this.tweetGetter = scheduler.scheduleAtFixedRate(
            () -> {
               List<String> tweets = getTweets(searchQuery);
               /* this.lastCall = LocalDateTime.now();
                for(Flow.Subscriber sub : this.getSubscribers()){
                    sub.onNext(tweets);
                }*/
               this.submit(tweets);
                if(tweets.size() >= 20) this.close();
            }, 0, period, unit);
}

This is my subscriber

    package myFlowAPI;

import Interfaces.IProcess;
import Services.LogToFileService;

import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;

public class MySubscriber implements Flow.Subscriber<List<String>> {
private Flow.Subscription subscription;
private AtomicInteger count;

private IProcess processor;

private String name;
private int DEMAND = 0;

public MySubscriber(String name, IProcess processor){
    this.name = name;
    this.processor = processor;
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
}


@Override
public void onNext(List<String> item) {
    Object result = this.processor.process(item);
    this.readResult(result);

    switch (this.processor.getClass().getSimpleName()){
        case "CalculateTweetStatsService":
            if((Integer) result >= 20){
                this.subscription.cancel();
            }
            break;
    }
}

@Override
public void onError(Throwable throwable) {
    System.out.println("Error is thrown " + throwable.getMessage());
}

@Override
public void onComplete() {
    if(this.processor instanceof LogToFileService){
        ((LogToFileService) processor).closeResource();
    }
    System.out.println("complete");
}

private void readResult(Object result){
    System.out.println("Result of " + this.processor.getClass().getSimpleName() + " processor is " + result.toString());
}
}

This is the main where I subscribe to the publisher

public static void main(String[] args) {
    ScheduledExecutorService  executor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors());

    String searchQuery;
    try{
       searchQuery = args[0] != null ? args[0] : "#capgemini50";
    }catch (ArrayIndexOutOfBoundsException ex){
        searchQuery = "#capgemini50";
    }

    TwitterStreamer streamer = new TwitterStreamer(executor, 5, 15L, SECONDS, searchQuery);

    MySubscriber subscriber1 = new MySubscriber("LogFileSubscriber", new LogToFileService("./tweetsLogger.txt"));
    MySubscriber subscriber2 = new MySubscriber("TotalTweetSubscriber",new CalculateTweetStatsService());
    streamer.subscribe(subscriber1);
    streamer.subscribe(subscriber2);

}
like image 582
user1008531 Avatar asked Oct 12 '17 15:10

user1008531


1 Answers

You need the subscriber to explicitly request data e.g. upon subscription (see https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Subscription.html#request-long-):

@Override
public void onSubscribe(Flow.Subscription subscription) {
    this.subscription = subscription;
    this.subscription.request(1);
}

Same upon processing in onNext() to request the next item.

like image 92
M A Avatar answered Oct 21 '22 01:10

M A