Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Service Bus Workflow Activities

I would like to access Service Bus Queues and Topics from Workflows with some specific activities.

I couldn't find anything fitting this scenario (this MSDN article and this article by Roman Kiss) are the nearest one.

I would like to design a custom activity which uses the QueueClient to receive asynchronously the brokered messages, using the BeginReceive method implemented with the async/await pattern (please see my question about it).

First of all, I would like to ask if it there are any reasons why I should prefer the suggested approach (adapted WCF) instead of my desired one (using the QueueClient).

Then, I would appreciate help designing it in a persistence-friendly way.

Update:

This is what I tried so far:

public class AsyncReceiveBrokeredMessage : AsyncCodeActivity<BrokeredMessage>
{
    [RequiredArgument]
    public InArgument<string> ConnectionString { get; set; }

    [RequiredArgument]
    public InArgument<string> Path { get; set; }

    protected sealed override IAsyncResult BeginExecute(AsyncCodeActivityContext context, AsyncCallback callback, object state)
    {
        var connectionString = this.ConnectionString.Get(context);
        var path = this.Path.Get(context);
        var queueClient = QueueClient.CreateFromConnectionString(connectionString, path);
        var cts = new CancellationTokenSource();
        context.UserState = new ReceiveState
                                {
                                    CancellationTokenSource = cts,
                                    QueueClient = queueClient
                                };
        var task = ExecuteAsync(context, cts.Token);
        var tcs = new TaskCompletionSource<BrokeredMessage>(state);
        task.ContinueWith(
            t =>
                {
                    if (t.IsFaulted)
                    {
                        tcs.TrySetException(t.Exception.InnerExceptions);
                    }
                    else if (t.IsCanceled)
                    {
                        tcs.TrySetCanceled();
                    }
                    else
                    {
                        tcs.TrySetResult(t.Result);
                    }

                    if (callback != null)
                    {
                        callback(tcs.Task);
                    }
                });

        return tcs.Task;
    }

    protected sealed override BrokeredMessage EndExecute(AsyncCodeActivityContext context, IAsyncResult result)
    {
        var task = (Task<BrokeredMessage>)result;
        try
        {
            return task.Result;
        }
        catch (OperationCanceledException)
        {
            if (context.IsCancellationRequested)
            {
                context.MarkCanceled();
            }
            else
            {
                throw;
            }

            return null; // or throw?
        }
        catch (AggregateException exception)
        {
            if (exception.InnerException is OperationCanceledException)
            {
                if (context.IsCancellationRequested)
                {
                    context.MarkCanceled();
                }
                else
                {
                    throw;
                }

                return null; // or throw?
            }

            ExceptionDispatchInfo.Capture(exception.InnerException).Throw();
            throw;
        }
    }

    protected override void Cancel(AsyncCodeActivityContext context)
    {
        var state = (ReceiveState)context.UserState;
        state.CancellationTokenSource.Cancel();
    }

    private async Task<BrokeredMessage> ExecuteAsync(
        AsyncCodeActivityContext context, CancellationToken cancellationToken)
    {
        var receiveState = context.UserState as ReceiveState;
        var receiveTask = Task<BrokeredMessage>.Factory.FromAsync(
            receiveState.QueueClient.BeginReceive, receiveState.QueueClient.EndReceive, null);
        var completionTask = receiveTask.ContinueWith(
             t =>
                 {
                     BrokeredMessage result;
                     if (t.IsCanceled)
                     {
                         context.MarkCanceled();
                         result = null;
                     }
                     else if (t.IsFaulted)
                     {
                         result = null;
                     }
                     else
                     {

                         t.Result.Complete();
                         result = t.Result;
                     }

                     receiveState.QueueClient.Close();
                     return result;
                 },
             cancellationToken);
        return await completionTask;
    }

    private class ReceiveState
    {
        public CancellationTokenSource CancellationTokenSource { get; set; }

        public QueueClient QueueClient { get; set; }
    }
}

And tested this way (using local Windows Server Service Bus):

var connectionString = new Variable<string>
                                   {
                                       Default = connectionStringValue
                                   };
        var path = new Variable<string>
                       {
                           Default = pathValue
                       };
        var test = new While
                       {
                           Body =
                               new Pick
                                   {
                                       Branches =
                                           {
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new AsyncReceiveBrokeredMessage
                                                               {
                                                                   ConnectionString = new InArgument<string>(connectionString),
                                                                   Path = new InArgument<string>(path)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Received message"
                                                               }
                                                   },
                                               new PickBranch
                                                   {
                                                       Trigger =
                                                           new Delay
                                                               {
                                                                   Duration = TimeSpan.FromSeconds(10)
                                                               },
                                                       Action =
                                                           new WriteLine
                                                               {
                                                                   Text =
                                                                       "Timeout!"
                                                               }
                                                   }
                                           }
                                   },
                           Condition = true,
                           Variables = { connectionString, path }
                       };
        WorkflowInvoker.Invoke(test);

I receive messages as expected if I continuously send them. Problems come after the first timeout, because then I'm not receiving anymore any message. Any clarification is appreciated.

like image 274
fra Avatar asked Jun 08 '13 19:06

fra


People also ask

How does a Service Bus work?

Service Bus is used to decouple applications and services from each other, providing the following benefits: Load-balancing work across competing workers. Safely routing and transferring data and control across service and application boundaries. Coordinating transactional work that requires a high-degree of ...

What are the four Service Bus communication mechanisms?

Architecture. Azure Service Bus offers three types of communication mechanisms; queues, topics and, relays. Queues and Topics, facilitate one-directional communication.

How many queues are in a Service Bus?

10,000 for the Basic or Standard tier. The total number of topics and queues in a namespace must be less than or equal to 10,000. For the Premium tier, 1,000 per messaging unit (MU).

What is a Service Bus topic?

Service Bus Topics and Subscriptions is an API messaging service that allows different applications and clients (message senders) to publish messages that are then consumed by other sets of applications and services (message receivers).


1 Answers

First you need to know some important things: 1) Workflows are long running processes meant to be pausable and restorable later. 2) The way workflows get woken up and restored is Bookmarks. 3) Usually people like their workflows to be persistable while being paused as well. (If you don't care about persistance why are you using WF anyway - just for the visual design tooling?)

Logical problem:

If all your workflows and their activities are persisted and suspended, then none of your activity code is even loaded, so who is doing the listening? Answer: something else, not an Activity, has to be the thing that is listening on the ServiceBus queue and taking responsibility for resuming bookmarks to wake up your workflows.

That something is the workflow 'Host', or some extension of it. Here are a couple blog posts about how you can customize a host to listens to messages [from a GUI button] and wake up a workflow activity.

http://blogs.msdn.com/b/tilovell/archive/2011/02/26/wf4-workflow-4-0-hosting-extensions-redux.aspx

http://blogs.msdn.com/b/tilovell/archive/2010/06/08/wf4-workflow-4-0-hosting-extensions.aspx

What you could do is take this code and adapt it to listen on a ServiceBus queue instead of a GUI button, and wake up your own ReceiveFromServiceBus activity, which is analogous to PageActivity - note you have to be writing a NativeActivity in order to work with bookmarks properly.

All rather cumbersome... but I believe the 'right' way to do it with WF.

like image 134
Tim Lovell-Smith Avatar answered Dec 12 '22 17:12

Tim Lovell-Smith