We're using the Stream functionality in RavenDB to load, transform and migrate data between 2 databases like so:
var query = originSession.Query<T>(IndexForQuery);
using (var stream = originSession.Advanced.Stream(query))
{
while (stream.MoveNext())
{
var streamedDocument = stream.Current.Document;
OpenSessionAndMigrateSingleDocument(streamedDocument);
}
}
The problem is that one of the collections has millions of rows, and we keep receiving an IOException
in the following format:
Application: MigrateToNewSchema.exe
Framework Version: v4.0.30319
Description: The process was terminated due to an unhandled exception.
Exception Info: System.IO.IOException
Stack:
at System.Net.ConnectStream.Read(Byte[], Int32, Int32)
at System.IO.Compression.DeflateStream.Read(Byte[], Int32, Int32)
at System.IO.Compression.GZipStream.Read(Byte[], Int32, Int32)
at System.IO.StreamReader.ReadBuffer(Char[], Int32, Int32, Boolean ByRef)
at System.IO.StreamReader.Read(Char[], Int32, Int32)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadData(Boolean, Int32)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadStringIntoBuffer(Char)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseString(Char)
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ParseValue()
at Raven.Imports.Newtonsoft.Json.JsonTextReader.ReadInternal()
at Raven.Imports.Newtonsoft.Json.JsonTextReader.Read()
at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Json.Linq.RavenJObject.Load(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Json.Linq.RavenJToken.ReadFrom(Raven.Imports.Newtonsoft.Json.JsonReader)
at Raven.Client.Connection.ServerClient+<YieldStreamResults>d__6b.MoveNext()
at Raven.Client.Document.DocumentSession+<YieldQuery>d__c`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MoveNext()
at MigrateToNewSchema.Migrator.DataMigratorBase`1[[System.__Canon, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089]].MigrateCollection()
at MigrateToNewSchema.Program.MigrateData(MigrateToNewSchema.Enums.CollectionToMigrate, Raven.Client.IDocumentStore, Raven.Client.IDocumentStore)
at MigrateToNewSchema.Program.Main(System.String[])
This happens quite a long way into streaming and of course transient connection issues will occur over this sort of period (It takes hours to complete).
However, when we retry, as we are using a Query
we have to start from scratch. So ultimately if there is a connection failure during the whole Stream
then we have to try it again, and again until it works end to end.
I know you can use ETag
with stream to effectively restart at a certain point, however there is no overload to do this with a Query
which we need to filter the results being migrated and specify the correct collection.
So, in RavenDB, is there a way to either improve the internal resilience of the connection (connection string property, internal settings etc) or effectively "recover" a stream on an error?
As per the suggestion from @StriplingWarrior I've recreated the solution using Data Subscriptions.
Using this approach I was able to iterate over all 2 million rows (though admittedly with much less processing per item); 2 points here that would have helped when we were trying to implement the same logic using Streams:
IObserver<T>
has to complete successfully for this acknowledgment to be set. The testing environment is a RavenDB 3.0 database (local machine, running as a windows service) with default settings against a collection of 2 million records.
Code to generate the dummy records:
using (IDocumentStore store = GetDocumentStore())
{
store.Initialize();
using (var bulkInsert = store.BulkInsert())
{
for (var i = 0; i != recordsToCreate; i++)
{
var person = new Person
{
Id = Guid.NewGuid(),
Firstname = NameGenerator.GenerateFirstName(),
Lastname = NameGenerator.GenerateLastName()
};
bulkInsert.Store(person);
}
}
}
Subscribing to this collection is then a case of:
using (IDocumentStore store = GetDocumentStore())
{
store.Initialize();
var subscriptionId = store.Subscriptions.Create(new SubscriptionCriteria<Person>());
var personSubscription = store.Subscriptions.Open<Person>(
subscriptionId, new SubscriptionConnectionOptions()
{
BatchOptions = new SubscriptionBatchOptions()
{
// Max number of docs that can be sent in a single batch
MaxDocCount = 16 * 1024,
// Max total batch size in bytes
MaxSize = 4 * 1024 * 1024,
// Max time the subscription needs to confirm that the batch
// has been successfully processed
AcknowledgmentTimeout = TimeSpan.FromMinutes(3)
},
IgnoreSubscribersErrors = false,
ClientAliveNotificationInterval = TimeSpan.FromSeconds(30)
});
personSubscription.Subscribe(new PersonObserver());
while (true)
{
Thread.Sleep(TimeSpan.FromMilliseconds(500));
}
}
Note the PersonObserver
; this is just a basic implementation of IObserver like so:
public class PersonObserver : IObserver<Person>
{
public void OnCompleted()
{
Console.WriteLine("Completed");
}
public void OnError(Exception error)
{
Console.WriteLine("Error occurred: " + error.ToString());
}
public void OnNext(Person person)
{
Console.WriteLine($"Received '{person.Firstname} {person.Lastname}'");
}
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With