I have a producer-consumer scenario where the producer is an enumerable sequence of items (IEnumerable<Item>). I want to process these items in chunks/batches of 10 items each. So I decided to use the new (.NET 6) Chunk LINQ operator, as suggested in this question: Create batches in LINQ.
My problem is that sometimes the producer fails, and in this case the consumer of the chunkified sequence receives the error without first receiving a chunk with the last items that were produced before the error. So if for example the producer generates 15 items and then fails, the consumer will get a chunk with the items 1-10 and then will get an exception. The items 11-15 will be lost! Here is a minimal example that demonstrates this undesirable behavior:
static IEnumerable<int> Produce()
{
int i = 0;
while (true)
{
i++;
Console.WriteLine($"Producing #{i}");
yield return i;
if (i == 15) throw new Exception("Oops!");
}
}
// Consume
foreach (int[] chunk in Produce().Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
Output:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
Online demo.
The desirable behavior would be to get a chunk with the values [11, 12, 13, 14, 15] before getting the exception.
My question is: Is there any way to configure the Chunk operator so that it prioritizes emitting data instead of exceptions? If not, how can I implement a custom LINQ operator, named for example ChunkNonDestructive, with the desirable behavior?
public static IEnumerable<TSource[]> ChunkNonDestructive<TSource>(
this IEnumerable<TSource> source, int size);
Note: Except from the System.Linq.Chunk operator I also experimented with the Buffer operator from the System.Interactive package, as well as the Batch operator from the MoreLinq package. Apparently they all behave the same (destructively).
Update: Here is the desirable output of the above example:
Producing #1
Producing #2
Producing #3
Producing #4
Producing #5
Producing #6
Producing #7
Producing #8
Producing #9
Producing #10
Consumed: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Producing #11
Producing #12
Producing #13
Producing #14
Producing #15
Consumed: [11, 12, 13, 14, 15]
Unhandled exception. System.Exception: Oops!
at Program.<Main>g__Produce|0_0()+MoveNext()
at System.Linq.Enumerable.ChunkIterator[TSource](IEnumerable`1 source, Int32 size)+MoveNext()
at Program.Main()
The difference is the line Consumed: [11, 12, 13, 14, 15], that is not present in the actual output.
If you preprocess your source to make it stop when it encounters an exception, then you can use Chunk() as-is.
public static class Extensions
{
public static IEnumerable<T> UntilFirstException<T>(this IEnumerable<T> source, Action<Exception> exceptionCallback = null)
{
using var enumerator = source.GetEnumerator();
while (true)
{
T current;
try
{
if (!enumerator.MoveNext())
{
break;
}
current = enumerator.Current;
}
catch (Exception e)
{
exceptionCallback?.Invoke(e);
break;
}
yield return current;
}
}
}
Exception? e = null;
foreach (int[] chunk in Produce().UntilFirstException(thrown => e = thrown).Chunk(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
I feel like that keeps responsibilities separated nicely. If you want a helper that throws an exception instead of having to capture it yourself, you can use this as a component to simplify writing that helper:
public static IEnumerable<T[]> ChunkUntilFirstException<T>(this IEnumerable<T> source, int size)
{
Exception? e = null;
var result = source.UntilFirstException(thrown => e = thrown).Chunk(size);
foreach (var element in result)
{
yield return element;
}
if (e != null)
{
throw new InvalidOperationException("source threw an exception", e);
}
}
Note that this will throw a different exception than the one emitted by the producer. This lets you keep the stack trace associated with the original exception, whereas throw e would overwrite that stack trace.
You can tweak this according to your needs. If you need to catch a specific type of exception that you're expecting your producer to emit, it's easy enough to use the when contextual keyword with some pattern matching.
try
{
foreach (int[] chunk in Produce().ChunkUntilFirstException(10))
{
Console.WriteLine($"Consumed: [{String.Join(", ", chunk)}]");
}
}
catch (InvalidOperationException e) when (e.InnerException is {Message: "Oops!"})
{
Console.WriteLine(e.InnerException.ToString());
}
First off, a matter of semantics. There's nothing destructive in Chunk or Buffer or anything else, it just reads items from a source enumerable until it's over or it throws an exception. The only destructive thing in your code is you throwing exceptions, which behaves as expected (ie, unwinds the stack out of your generator, out of the Linq functions and into a catch in your code, if any exists).
Also it should be immediately obvious that every Linq functions will behave the same in regards to exceptions. It's in fact how exceptions work, and working around them to support your use case is relatively expensive: you'll need to swallow exceptions for every item you generate. This, in my humble opinion, is incredibly bad design, and you'd be fired on the spot if you worked for me and did that.
With all that out of the way, writing a BadDesignChunk like that is trivial (if expensive):
public static IEnumerable<IEnumerable<TSource>> BadDesignChunk<TSource>(this IEnumerable<TSource> source, int size)
{
Exception caughtException = default;
var chunk = new List<TSource>();
using var enumerator = source.GetEnumerator();
while(true)
{
while(chunk.Count < size)
{
try
{
if(!enumerator.MoveNext())
{
// end of the stream, send what we have and finish
goto end;
}
}
catch(Exception ex)
{
// exception, send what we have and finish
caughtException = ex;
goto end;
}
chunk.Add(enumerator.Current);
}
// chunk full, send it
yield return chunk;
chunk.Clear();
}
end:
if(chunk.Count > 0)
yield return chunk;
if(caughtException is not null)
throw caughtException;
}
See it in action here.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With