Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

MessageQueue and Async / Await

I only want to receive my message in a async method! and its freezing my UI

    public async void ProcessMessages()
    {
        MessageQueue MyMessageQueue = new MessageQueue(@".\private$\MyTransactionalQueue");
        MyMessageQueue.Formatter = new XmlMessageFormatter(new Type[] { typeof(string) });

        while (true)
        {
            MessageQueueTransaction MessageQueueTransaction = new MessageQueueTransaction();
            MessageQueueTransaction.Begin();

            ContainError = false;
            ProcessPanel.SetWaiting();

            string Body = MyMessageQueue.Receive(MessageQueueTransaction).Body.ToString();

            //Do some process with body string.

            MessageQueueTransaction.Commit();
        }
    }

I am just calling the method like any regular method and its nos working! This code used to work when i was using BackgroundWorkers instead of async/await

Ideas?

like image 811
Fraga Avatar asked Apr 19 '13 08:04

Fraga


People also ask

Are message queue asynchronous?

A message queue is a form of asynchronous service-to-service communication used in serverless and microservices architectures. Messages are stored on the queue until they are processed and deleted. Each message is processed only once, by a single consumer.

Is MSMQ asynchronous?

Microsoft Message Queuing, or MSMQ, is technology for asynchronous messaging. Whenever there's need for two or more applications (processes) to send messages to each other without having to immediately know results, MSMQ can be used. MSMQ can communicate between remote machines, even over Internet.

What is queue asynchronous?

An asynchronous queue is like a standard queue, except that when you dequeue an element from an empty queue, the computation blocks instead of failing.

What is synchronous asynchronous queue?

Synchronous means execution is blocked while waiting for a response. Asynchronous means execution continues immediately, and the response is delivered at some future time by a delegate callback, by executing a block/closure or whatever. Operation queues are often used to make synchronous things asynchronous.


2 Answers

As Stephen writes, async doesn't run your code in a thread. Fortunately, you can use TaskFactory.FromAsync with MessageQueue.BeginReceive/MessageQueue.EndReceive to receive messages asynchronously:

    private  async Task<Message> MyAsyncReceive()
    {
        MessageQueue queue=new MessageQueue();
        ...
        var message=await Task.Factory.FromAsync<Message>(
                           queue.BeginReceive(),
                           queue.EndReceive);

        return message;

    }

You should note though that there ISN'T a version of BeginReceive that uses a Transaction. From BeginReceive's docs:

Do not use the asynchronous call BeginReceive with transactions. If you want to perform a transactional asynchronous operation, call BeginPeek, and put the transaction and the (synchronous) Receive method within the event handler you create for the peek operation.

This makes sense as there is no guarantee how long you have to wait for a response or which thread will eventually handle the completed call.

To use transactions you would write something like this:

    private  async Task<Message> MyAsyncReceive()
    {
        var queue=new MessageQueue();

        var message=await Task.Factory.FromAsync<Message>(queue.BeginPeek(),queue.EndPeek);

        using (var tx = new MessageQueueTransaction())
        {
            tx.Begin();

            //Someone may have taken the last message, don't wait forever
            //Use a smaller timeout if the queue is local
            message=queue.Receive(TimeSpan.FromSeconds(1), tx);
            //Process the results inside a transaction
            tx.Commit();
        }
        return message;
    }

UPDATE

As Rob pointed out, the original code used the message returned from Peek, which may have changed between Peek and Receive. In this case the second message will be lost.

There's still a chance of blocking though, if another client reads the last message in the queue. To prevent this, Receive should have a small timeout.

like image 122
Panagiotis Kanavos Avatar answered Oct 21 '22 17:10

Panagiotis Kanavos


async does not run your code on a background thread. Your code above should have caused a compiler warning that tells you that your method will run synchronously.

If you want to execute a method on a background thread, use TaskEx.Run:

public void ProcessMessages()
{
  ...
}

TaskEx.Run(() => ProcessMessages());
like image 21
Stephen Cleary Avatar answered Oct 21 '22 16:10

Stephen Cleary