Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to pass parameters to an implementation of IEventProcessor

I am busy implementing a EventProcessorHost client for an azure EventBus client.

I have a class that implements IEventProcessor as follows:

 public class MyEventProcessor : IEventProcessor
    {
        Stopwatch checkpointStopWatch;            

        //TODO: get provider id from parent class     


        public async Task CloseAsync(PartitionContext context, CloseReason reason)
        {
            Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
            if (reason == CloseReason.Shutdown)
            {
                await context.CheckpointAsync();
            }
        }

        public Task OpenAsync(PartitionContext context)
        {
            Debug.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
            eventHandler = new MyEventHandler();
            this.checkpointStopWatch = new Stopwatch();
            this.checkpointStopWatch.Start();
            return Task.FromResult<object>(null);
        }

        async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
        {
            foreach (EventData eventData in messages)
            {
                string data = Encoding.UTF8.GetString(eventData.GetBytes());              
                Debug.WriteLine(data);       
            }
            //Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
            if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
            {
                await context.CheckpointAsync();
                this.checkpointStopWatch.Restart();
            }
        }
    }

I then call this as follows:

 EventProcessorHost _eventProcessorHost = new EventProcessorHost(eventProcessorHostName, EndpointName, EventHubConsumerGroup.DefaultGroupName, ConnectionString, storageConnectionString, "messages-events");
 await _eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();

I need to pass a parameter to the instance of MyEventProcessor which the EventProcessorHost creates. How would I go about doing this?

like image 533
Zapnologica Avatar asked Oct 27 '15 15:10

Zapnologica


1 Answers

You just need to use RegisterEventProcessorFactoryAsync to pass in a factory instance. That factory class can pass in whatever parameters are appropriate in the factory method possibly by passing them into the factory in the first place, or having the factory vary the behavior. In the code sketched out below you can see two parameters being passed into the IEventProcessor. One of them from the factory's parameters and the other is a counter of how many times the factory has been called.

class AzureStreamProcessor : IEventProcessor
{
     ....
}

class AzureStreamProcessorFactory : IEventProcessorFactory
{
    public AzureStreamProcessorFactory(string str)
    {
         this.randomString = str;
    }

    private string randomString;
    private int numCreated = 0;
    IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
    {
        return new AzureStreamProcessor(context, randomString, Interlocked.Increment(ref numCreated));
    }
}

host.RegisterEventProcessorFactoryAsync(new AzureStreamProcessorFactory("a parameter"), options);
like image 184
cacsar Avatar answered Sep 20 '22 07:09

cacsar