Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Create an observable for an event loop

I have been reading a little about rx-java recently. I am wondering whether the framework fits the bill for a communication system between threads. I am working on a REST server written in java. Each time some resource is PUT/POSTed I want to do some computation using a pool of worker threads. However, I would still like to be able to monitor the requests, maybe to print out some statistics. Essentially I would like an Observable so I can deal with the requests in a flexible way with multiple Observers.

My question is, how can I create a suitable Observable? Most guides I have seen deal with operations on Observables, such as mapping etc. Obervables are mostly created from collections or integer ranges. In any case it seems to be impossible to push new values to the created Observables. Apparently the only way to retain this flexibility is to use Observable.create. However, this seems to be rather low-level. I would have to implement a list of queues for each new subscriber and do a synchronized push to every single one. Is this really necessary or is something like this implemented already in rx-java?

like image 544
hfhc2 Avatar asked Mar 07 '16 10:03

hfhc2


People also ask

Can we create our own Observable in RxJava?

RxJava provides many methods in its library to create an Observable. Choosing which one to use can be difficult. My goal from this article is to help you in making this choice simpler by providing you with a mental map of different scenarios and which methods to use in each scenario.

How do you create an Observable object in Java?

fromCallable example: Callable<String> callable = () -> { System. out. println("Hello World!"); return "Hello World!"); } Observable<String> observable = Observable. fromCallable(callable); observable.

How many ways can you make Observable?

There are two main methods to create Observables in RxJS. Subjects and Operators. We will take a look at both of these!


1 Answers

What you're looking for is a Subject. These act as both Observer and Observables. For example a ReplaySubject will replay all events sent to it to all subscribers.

Subject<String> replaySubject = ReplaySubject.create();
replaySubject.subscribe(s -> System.out.println(s));

// elsewhere...

replaySubject.onNext("First");
replaySubject.onNext("Second");
replaySubject.onComplete();
like image 78
tddmonkey Avatar answered Sep 23 '22 18:09

tddmonkey