Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I create a Flux/Publisher for streaming data

I am using a polling method to fetch data periodically. New data may arrive at any time. I want to expose a reactive interface to my client. So, I want to create a publisher (Flux?) that would publish new data when it becomes available and notify the subscriber(s). How do I do that? All the examples of Flux that I see are for the cases where the data is already known/available. Effectively, I want something like a Flux based on a queue and my polling thread can keep filling the queue when it finds new data.

like image 724
Pratik Avatar asked Jun 24 '19 23:06

Pratik


Video Answer


1 Answers

For something simple, you might want to use a DirectProcessor. This isn't the most complex of flux sinks, but it'll get you a bit of the way there.

I wrote a quick example:

Flux<String> hot = DirectProcessor.create<String>()
hot.onNext("Hello")//not printed
hot.subscribe(it -> System.out.println(it))

hot.onNext("Goodbye")//printed
Thread.sleep(100)
hot.onNext("foo")//printed

DirectProcessor implements Flux, so you can use it like a Flux.

As you can see, elements added before subscribing to the hotsource won't be passed down to the subscribe.

Looking at other posts, Flux#create and Flux#generate might be good places to start. Difference Between Flux.create and Flux.generate <- this will get you more complexity and control over the flux.

like image 67
Carson Graham Avatar answered Oct 19 '22 11:10

Carson Graham