I am busy implementing a EventProcessorHost
client for an azure EventBus client.
I have a class that implements IEventProcessor
as follows:
public class MyEventProcessor : IEventProcessor
{
Stopwatch checkpointStopWatch;
//TODO: get provider id from parent class
public async Task CloseAsync(PartitionContext context, CloseReason reason)
{
Debug.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public Task OpenAsync(PartitionContext context)
{
Debug.WriteLine("SimpleEventProcessor initialized. Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
eventHandler = new MyEventHandler();
this.checkpointStopWatch = new Stopwatch();
this.checkpointStopWatch.Start();
return Task.FromResult<object>(null);
}
async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (EventData eventData in messages)
{
string data = Encoding.UTF8.GetString(eventData.GetBytes());
Debug.WriteLine(data);
}
//Call checkpoint every 5 minutes, so that worker can resume processing from the 5 minutes back if it restarts.
if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
{
await context.CheckpointAsync();
this.checkpointStopWatch.Restart();
}
}
}
I then call this as follows:
EventProcessorHost _eventProcessorHost = new EventProcessorHost(eventProcessorHostName, EndpointName, EventHubConsumerGroup.DefaultGroupName, ConnectionString, storageConnectionString, "messages-events");
await _eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>();
I need to pass a parameter to the instance of MyEventProcessor
which the EventProcessorHost
creates. How would I go about doing this?
You just need to use RegisterEventProcessorFactoryAsync to pass in a factory instance. That factory class can pass in whatever parameters are appropriate in the factory method possibly by passing them into the factory in the first place, or having the factory vary the behavior. In the code sketched out below you can see two parameters being passed into the IEventProcessor. One of them from the factory's parameters and the other is a counter of how many times the factory has been called.
class AzureStreamProcessor : IEventProcessor
{
....
}
class AzureStreamProcessorFactory : IEventProcessorFactory
{
public AzureStreamProcessorFactory(string str)
{
this.randomString = str;
}
private string randomString;
private int numCreated = 0;
IEventProcessor IEventProcessorFactory.CreateEventProcessor(PartitionContext context)
{
return new AzureStreamProcessor(context, randomString, Interlocked.Increment(ref numCreated));
}
}
host.RegisterEventProcessorFactoryAsync(new AzureStreamProcessorFactory("a parameter"), options);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With