I have been trying to solve this "Concurrent Programming" exam exercise (in C#):
Knowing that
Stream
class containsint Read(byte[] buffer, int offset, int size)
andvoid Write(byte[] buffer, int offset, int size)
methods, implement in C# theNetToFile
method that copies all data received fromNetworkStream net
instance to theFileStream file
instance. To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations. The transfer ends when thenet
read operation returns value 0. To simplify, it is not necessary to support controlled cancel of the operation.
void NetToFile(NetworkStream net, FileStream file);
I've been trying to solve this exercise, but I'm struggling with a question related with the question itself. But first, here is my code:
public static void NetToFile(NetworkStream net, FileStream file) { byte[] buffer = new byte[4096]; // buffer with 4 kB dimension int offset = 0; // read/write offset int nBytesRead = 0; // number of bytes read on each cycle IAsyncResult ar; do { // read partial content of net (asynchronously) ar = net.BeginRead(buffer,offset,buffer.Length,null,null); // wait until read is completed ar.AsyncWaitHandle.WaitOne(); // get number of bytes read on each cycle nBytesRead = net.EndRead(ar); // write partial content to file (synchronously) fs.Write(buffer,offset,nBytesRead); // update offset offset += nBytesRead; } while( nBytesRead > 0); }
The question I have is that, in the question statement, is said:
To do the transfer, use asynchronous reads and synchronous writes, avoiding one thread to be blocked during read operations
I'm not really sure if my solution accomplishes what is wanted in this exercise, because I'm using AsyncWaitHandle.WaitOne()
to wait until the asynchronous read completes.
On the other side, I'm not really figuring out what is meant to be a "non-blocking" solution in this scenario, as the FileStream
write is meant to be made synchronously... and to do that, I have to wait until NetworkStream
read completes to proceed with the FileStream
writing, isn't it?
Can you, please, help me out with this?
[ EDIT 1 ] Using callback solution
Ok, if I understood what Mitchel Sellers and willvv replied, I've been counseled to use a callback method to turn this into a "non-blocking" solution. Here is my code, then:
byte[] buffer; // buffer public static void NetToFile(NetworkStream net, FileStream file) { // buffer with same dimension as file stream data buffer = new byte[file.Length]; //start asynchronous read net.BeginRead(buffer,0,buffer.Length,OnEndRead,net); } //asynchronous callback static void OnEndRead(IAsyncResult ar) { //NetworkStream retrieve NetworkStream net = (NetworkStream) ar.IAsyncState; //get number of bytes read int nBytesRead = net.EndRead(ar); //write content to file //... and now, how do I write to FileStream instance without //having its reference?? //fs.Write(buffer,0,nBytesRead); }
As you may have noticed, I'm stuck on the callback method, as I don't have a reference to the FileStream
instance where I want to invoke the "Write(...)" method.
Additionally, this is not a thread-safe solution, as the byte[]
field is exposed and may be shared among concurrent NetToFile
invocations. I don't know how to solve this problem without exposing this byte[]
field in the outer-scope... and I'm almost sure it may not be exposed this way.
I don't want to use a lambda or anonymous method solution, because that's not in the curriculum of "Concurrent Programing" course.
A stream is a sequence of asynchronous events. It is like an asynchronous Iterable—where, instead of getting the next event when you ask for it, the stream tells you that there is an event when it is ready.
ReadAsync(Memory<Byte>, CancellationToken)Asynchronously reads a sequence of bytes from the current stream, advances the position within the stream by the number of bytes read, and monitors cancellation requests. C# Copy.
StreamReader.ReadLineAsync Method (System.IO) Reads a line of characters asynchronously from the current stream and returns the data as a string.
The . NET Framework 4 introduced an asynchronous programming concept referred to as a Task and ASP.NET 4.5 supports Task. Tasks are represented by the Task type and related types in the System.
Even though it goes against the grain to help people with their homework, given that this is more than a year old, here's the proper way to accomplish this. All you need to overlap your read/write operations — no spawning of additional threads, or anything else is required.
public static class StreamExtensions { private const int DEFAULT_BUFFER_SIZE = short.MaxValue ; // +32767 public static void CopyTo( this Stream input , Stream output ) { input.CopyTo( output , DEFAULT_BUFFER_SIZE ) ; return ; } public static void CopyTo( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; IAsyncResult read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; IAsyncResult write = null ; while ( true ) { // wait for the read operation to complete read.AsyncWaitHandle.WaitOne() ; bufl[bufno] = input.EndRead(read) ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break ; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } // start the new write operation write = output.BeginWrite( buf[bufno] , 0 , bufl[bufno] , null , null ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1 ; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.BeginRead( buf[bufno] , 0 , buf[bufno].Length , null , null ) ; } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { write.AsyncWaitHandle.WaitOne() ; output.EndWrite(write) ; } output.Flush() ; // return to the caller ; return ; } public static async Task CopyToAsync( this Stream input , Stream output ) { await input.CopyToAsync( output , DEFAULT_BUFFER_SIZE ) ; return; } public static async Task CopyToAsync( this Stream input , Stream output , int bufferSize ) { if ( !input.CanRead ) throw new InvalidOperationException( "input must be open for reading" ); if ( !output.CanWrite ) throw new InvalidOperationException( "output must be open for writing" ); byte[][] buf = { new byte[bufferSize] , new byte[bufferSize] } ; int[] bufl = { 0 , 0 } ; int bufno = 0 ; Task<int> read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ) ; Task write = null ; while ( true ) { await read ; bufl[bufno] = read.Result ; // if zero bytes read, the copy is complete if ( bufl[bufno] == 0 ) { break; } // wait for the in-flight write operation, if one exists, to complete // the only time one won't exist is after the very first read operation completes if ( write != null ) { await write ; } // start the new write operation write = output.WriteAsync( buf[bufno] , 0 , bufl[bufno] ) ; // toggle the current, in-use buffer // and start the read operation on the new buffer. // // Changed to use XOR to toggle between 0 and 1. // A little speedier than using a ternary expression. bufno ^= 1; // bufno = ( bufno == 0 ? 1 : 0 ) ; read = input.ReadAsync( buf[bufno] , 0 , buf[bufno].Length ); } // wait for the final in-flight write operation, if one exists, to complete // the only time one won't exist is if the input stream is empty. if ( write != null ) { await write; } output.Flush(); // return to the caller ; return; } }
Cheers.
I doubt this is the fastest code (there's some overhead from the .NET Task abstraction) but I do think it's a cleaner approach to the whole async copy thing.
I needed a CopyTransformAsync
where I could pass a delegate to do something as chunks were passed through the copy operation. e.g. compute a message digest while copying. That's why I got interested in rolling my own option.
Findings:
Serial
test is clearly the fastest and most resource intensiveHere's what I've found and the complete source code for the program I used to test this. On my machine, these tests were run on a SSD disk and is the equivalent of a file copy. Normally, you'd not want to use this for just copying files, instead when you have a network stream (which is what my use case is), that's when you'd wanna use something like this.
4K buffer Serial... in 0.474s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... timed out CopyTransformAsync (Asynchronous)... timed out 8K buffer Serial... in 0.344s CopyToAsync... timed out CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 1.116s CopyTransformAsync (Asynchronous)... timed out 40K buffer Serial... in 0.195s CopyToAsync... in 0.624s CopyToAsync (Asynchronous)... timed out CopyTransformAsync... in 0.378s CopyTransformAsync (Asynchronous)... timed out 80K buffer Serial... in 0.190s CopyToAsync... in 0.355s CopyToAsync (Asynchronous)... in 1.196s CopyTransformAsync... in 0.300s CopyTransformAsync (Asynchronous)... in 0.886s 160K buffer Serial... in 0.432s CopyToAsync... in 0.252s CopyToAsync (Asynchronous)... in 0.454s CopyTransformAsync... in 0.447s CopyTransformAsync (Asynchronous)... in 0.555s
Here you can see the Process Explorer, performance graph as the test is run. Basically each top (in the lower of the three graphs) is the start of the serial test. You can clearly see how the throughput increases dramatically as the buffer size grows. It would appear as if it plans out somewhere around 80K which is what the .NET framework CopyToAsync
method uses, internally.
The nice thing here is that the final implementation wasn't that complicated:
static Task CompletedTask = ((Task)Task.FromResult(0)); static async Task CopyTransformAsync(Stream inputStream , Stream outputStream , Func<ArraySegment<byte>, ArraySegment<byte>> transform = null ) { var temp = new byte[bufferSize]; var temp2 = new byte[bufferSize]; int i = 0; var readTask = inputStream .ReadAsync(temp, 0, bufferSize) .ConfigureAwait(false); var writeTask = CompletedTask.ConfigureAwait(false); for (; ; ) { // synchronize read int read = await readTask; if (read == 0) { break; } if (i++ > 0) { // synchronize write await writeTask; } var chunk = new ArraySegment<byte>(temp, 0, read); // do transform (if any) if (!(transform == null)) { chunk = transform(chunk); } // queue write writeTask = outputStream .WriteAsync(chunk.Array, chunk.Offset, chunk.Count) .ConfigureAwait(false); // queue read readTask = inputStream .ReadAsync(temp2, 0, bufferSize) .ConfigureAwait(false); // swap buffer var temp3 = temp; temp = temp2; temp2 = temp3; } await writeTask; // complete any lingering write task }
This method of interleaving the read/write despite the huge buffers is somewhere between 18% faster than the BCL CopyToAsync
.
Out of curiosity, I did change the async calls to typical begin/end async pattern calls and that did not improve the situation one bit, it made it worse. For all I like to bash on the Task abstraction overhead, they do some nifty things when you write you code with the async/await keywords and it is much nicer to read that code!
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With