Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to set the EventProcessorHost to read events from now on (UTC)?

We are using the EventProcessorHost to receive events from Azure EventHubs. I've been unsuccessfully trying to configure it (through the EventProcessorOptions.InitialOffsetProvider) to read events from UTC now on but it always reads from the start of the feed. I am not saving checkpoints (and I even deleted the BLOB container created). This is how I am setting it:

DateTime startDate = DateTime.UtcNow;

var epo = new EventProcessorOptions
            {
                MaxBatchSize = 100, 
                PrefetchCount = 100, 
                ReceiveTimeOut = TimeSpan.FromSeconds(120),  
                InitialOffsetProvider = (name) => startDate  
            };

Any guidance would be appreciated.

like image 252
Jose Parra Avatar asked Nov 23 '15 21:11

Jose Parra


2 Answers

Think this changed in version 2.0.0 - Rajiv's code would now be:

var eventProcessorOptions = new EventProcessorOptions
{
    InitialOffsetProvider = (partitionId) => EventPosition.FromEnqueuedTime(DateTime.UtcNow)
};

Here is an example block with fully qualified classnames:

    private static async Task MainAsync(string[] args)
    {
        try{
            Console.WriteLine("Registering EventProcessor...");

            string AISEhConnectionStringEndpoint = Configuration["AISEhConnectionStringEndpoint"];
            string AISEhConnectionStringSharedAccessKeyName = Configuration["AISEhConnectionStringSharedAccessKeyName"];
            string AISEhConnectionStringSharedAccessKey = Configuration["AISEhConnectionStringSharedAccessKey"];
            string EhConnectionString = $"Endpoint={AISEhConnectionStringEndpoint};SharedAccessKeyName={AISEhConnectionStringSharedAccessKeyName};SharedAccessKey={AISEhConnectionStringSharedAccessKey}";
            string AISEhEntityPath = Configuration["AISEhEntityPath"];
            string AISEhConsumerGroupName = Configuration["AISEhConsumerGroupName"];
            string AISStorageContainerName = Configuration["AISStorageContainerName"];
            string AISStorageAccountName = Configuration["AISStorageAccountName"];
            string AISStorageAccountKey = Configuration["AISStorageAccountKey"];

            string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", AISStorageAccountName, AISStorageAccountKey);

            var eventProcessorHost = new Microsoft.Azure.EventHubs.Processor.EventProcessorHost(
                AISEhEntityPath,
                AISEhConsumerGroupName,
                EhConnectionString,
                StorageConnectionString,
                AISStorageContainerName);

            var options = new Microsoft.Azure.EventHubs.Processor.EventProcessorOptions
            {
                InitialOffsetProvider = (partitionId) => Microsoft.Azure.EventHubs.EventPosition.FromEnqueuedTime(DateTime.UtcNow)
            };

            // Registers the Event Processor Host and starts receiving messages
            await eventProcessorHost.RegisterEventProcessorAsync<GetEvents>(options);

            Thread.Sleep(Timeout.Infinite);

            // Disposes of the Event Processor Host
            await eventProcessorHost.UnregisterEventProcessorAsync();
        }
        catch(Exception ex)
        {
            Console.WriteLine(ex.Message);
            NLog.LogManager.GetCurrentClassLogger().Error(ex);
            throw;
        }
    }

}

And here are my general settings with secrets/exact addresses obscured to help work things out, as I found working this out to be less pleasurable than extracting teeth:

"AISEhConnectionStringEndpoint": "sb://<my bus address>.servicebus.windows.net/",
"AISEhConnectionStringSharedAccessKeyName": "<my key name>",
"AISEhConnectionStringSharedAccessKey": "<yeah nah>",
"AISEhEntityPath": "<Event Hub entity path>",
"AISEhConsumerGroupName":  "<consumer group name e.g $Default>",
"AISStorageContainerName":  "<storage container name>",
"AISStorageAccountName": "<storage account name>",
"AISStorageAccountKey": "<yeah nah>",
like image 52
leighghunt Avatar answered Oct 20 '22 20:10

leighghunt


I found that the checkpoint folder in the blob was still there and my app was considering this and ignoring the date I set in EventProcessorOptions. After I deleted the container it started to run as expected (taking in count the UTC date).

like image 7
Jose Parra Avatar answered Oct 20 '22 19:10

Jose Parra