Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

RavenDB Stream for Unbounded Results - Connection Resilience

Tags:

c#

ravendb

We're using the Stream functionality in RavenDB to load, transform and migrate data between 2 databases like so:

var query = originSession.Query<T>(IndexForQuery);

using (var stream = originSession.Advanced.Stream(query))
{
    while (stream.MoveNext())
    {
        var streamedDocument = stream.Current.Document;

        OpenSessionAndMigrateSingleDocument(streamedDocument);
    }
}

The problem is that one of the collections has millions of rows, and we keep receiving an IOException in the following format:

Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
   at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
   at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
   at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
   at System.IO.StreamReader.Read(Char[], Int32, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
   at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
   at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
   at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
   at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
   at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
   at MigrateToNewSchema.Program.Main(System.String[])

This happens quite a long way into streaming and of course transient connection issues will occur over this sort of period (It takes hours to complete).

However, when we retry, as we are using a Query we have to start from scratch. So ultimately if there is a connection failure during the whole Stream then we have to try it again, and again until it works end to end.

I know you can use ETag with stream to effectively restart at a certain point, however there is no overload to do this with a Query which we need to filter the results being migrated and specify the correct collection.

So, in RavenDB, is there a way to either improve the internal resilience of the connection (connection string property, internal settings etc) or effectively "recover" a stream on an error?

like image 286
Luke Merrett Avatar asked Dec 31 '14 13:12

Luke Merrett


1 Answers

As per the suggestion from @StriplingWarrior I've recreated the solution using Data Subscriptions.

Using this approach I was able to iterate over all 2 million rows (though admittedly with much less processing per item); 2 points here that would have helped when we were trying to implement the same logic using Streams:

  1. Batches only get removed from the subscription "queue" once acknowledged (like most standard queues)
    1. The subscribed IObserver<T> has to complete successfully for this acknowledgment to be set.
    2. This information is handled by the server rather than the client so allows the client to restart without affecting the last successful position processed in the subscription
    3. See here for more details
  2. As @StriplingWarrior indicated because you can create subscriptions with filters right down to property level it would be possible to replay with a smaller result set in the event of an exception within the subscription itself.
    1. The first point really supersedes this; but it allows us additional flexibility not seen in the Stream API

The testing environment is a RavenDB 3.0 database (local machine, running as a windows service) with default settings against a collection of 2 million records.

Code to generate the dummy records:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    using (var bulkInsert = store.BulkInsert())
    {
        for (var i = 0; i != recordsToCreate; i++)
        {
            var person = new Person
            {
                Id = Guid.NewGuid(),
                Firstname = NameGenerator.GenerateFirstName(),
                Lastname = NameGenerator.GenerateLastName()
            };

            bulkInsert.Store(person);
        }
    }
}

Subscribing to this collection is then a case of:

using (IDocumentStore store = GetDocumentStore())
{
    store.Initialize();

    var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());

    var personSubscription = store.Subscriptions.Open<Person>(
        subscriptionId, new SubscriptionConnectionOptions()
    {
        BatchOptions = new SubscriptionBatchOptions()
        {
            // Max number of docs that can be sent in a single batch
            MaxDocCount = 16 * 1024,  
            // Max total batch size in bytes
            MaxSize = 4 * 1024 * 1024,
            // Max time the subscription needs to confirm that the batch
            // has been successfully processed
            AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
        },
        IgnoreSubscribersErrors = false,
        ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
    });

    personSubscription.Subscribe(new PersonObserver());

    while (true)
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(500));
    }
}

Note the PersonObserver; this is just a basic implementation of IObserver like so:

public class PersonObserver : IObserver<Person>
{
    public void OnCompleted()
    {
        Console.WriteLine("Completed");
    }

    public void OnError(Exception error)
    {
        Console.WriteLine("Error occurred: " + error.ToString());
    }

    public void OnNext(Person person)
    {
        Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
    }
}
like image 134
Luke Merrett Avatar answered Oct 26 '22 22:10

Luke Merrett