Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Using Reactive extension (Rx) for MSMQ message receive using async pattern (queue.BeginReceive,queue.EndReceive)

I have been using Rx for a while now for Events on my projects and dedicatedly for Socket programming and the good part is its doing well. Managing my code, performance advantage and much better to execute and interpret.

Lately I have to modify my project's process flow where i need to dump all the incoming data (from socket operations) into queues (using MSMQ implementation as decided for queueing).

As MSMQ provides async call for dequeing messages from the queue (but in an wierd pattern). I have been struggling to use Rx for this purpose now, but enable to do so.

Question : Can some one give me a clean code example to implement Rx for message receiving from queue using Async pattern.

I need the async operator implementation for MSMQ analogous to something like this

var data = Observable.FromAsyncPattern<byte[]>(
                        this.receiverSocket.BeginReceive,
                        this.receiverSocket.EndReceive(some parameters);

Thanks in advance. *cheers* to Rx and .NET

like image 782
Jsinh Avatar asked Nov 04 '11 07:11

Jsinh


1 Answers

It would be as simple as:

var queue = new System.Messaging.MessageQueue("test");
var fun = Observable.FromAsyncPattern((cb, obj) => queue.BeginReceive(TimeSpan.FromMinutes(10),obj,cb), a => queue.EndReceive(a));
var obs = fun();
like image 51
Ankur Avatar answered Oct 26 '22 17:10

Ankur