Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Stream.CopyToAsync with progress reporting - progress is reported even after copying finish

I've build a simple console applications that download files from the internet.
Because I had problems with WebClient I decided to write my app using HttpClient.

Basically I'm doing request to read headers, then using ReadAsStreamAsync I'm getting stream which I'm copying to local file using CopyToAsync.

I've found extension method for stream that supports IProgress:

public static class StreamExtensions
{
    public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
    {
        var buffer = new byte[bufferSize];
        int bytesRead;
        long totalRead = 0;
        while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
        {
            await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
            cancellationToken.ThrowIfCancellationRequested();
            totalRead += bytesRead;
            //Thread.Sleep(10);
            progress.Report(totalRead);
        }
    }
}

My application works, but I get incorrect progress information.
For example when downloading 2 files I see this in output window:

file1.tmp 60.95%
file2.tmp 98.09%
file1.tmp 60.98%
file2.tmp 98.21%
file2.tmp 98.17%
file2.tmp 98.25%
file1.tmp 61.02%
file2.tmp 98.41%
file2.tmp downloaded
file2.tmp 98.29%
file2.tmp 98.37%
file1.tmp 61.06%
file2.tmp 89.27%
file2.tmp 89.31%
file2.tmp 98.33%
file2.tmp 98.45%
file2.tmp 98.48%
file1.tmp 61.10%
file1.tmp 61.14%
file2.tmp 98.52%
file1.tmp 61.22%
file2.tmp 98.60%
file2.tmp 98.56%
file1.tmp 61.30%
file2.tmp 98.88%
file2.tmp 90.44%
file1.tmp 61.53%
file2.tmp 98.72%
file1.tmp 61.41%
file1.tmp 61.73%
file2.tmp 98.80%
file1.tmp 61.26%
file1.tmp 61.49%
file1.tmp 61.57%
file1.tmp 61.69%
...
file1.tmp 99.31%
file1.tmp 98.84%
file1.tmp 98.80%
file1.tmp 99.04%
file1.tmp 99.43%
file1.tmp 99.12%
file1.tmp 99.00%
file1.tmp downloaded
file1.tmp 100.00%
file1.tmp 98.73%
file1.tmp 98.88%
file1.tmp 99.47%
file1.tmp 99.98%
file1.tmp 99.90%
file1.tmp 98.96%
file1.tmp 99.78%
file1.tmp 99.99%
file1.tmp 99.74%
file1.tmp 99.59%
file1.tmp 99.94%
file1.tmp 98.49%
file1.tmp 98.53%
ALL FILES DOWNLOADED
file1.tmp 99.55%
file1.tmp 98.41%
file1.tmp 99.62%
file1.tmp 98.34%
file1.tmp 99.66%
file1.tmp 98.69%
file1.tmp 98.37%

As You can see I got info that file2 is downloaded, but I'm still getting progress report from CopyToAsync , same with file1.

because of that I sometimes get this weird console output:

enter image description here

Ideally I'd like to be sure than when I call:

await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);
Debug.WriteLine(filename+" downloaded");

after I get that debug information no progress is reported (file is downloaded). I thought that await will solve my problem, but is doesn't.

How can I fix this? As a temporary solution I'm adding Thread.Sleep to CopyToAsync just before I report progress.

Below is my current code:

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncDownloadTest
{
    class Program
    {
        private const string LocalPath = @"D:\TEMP";

        static void Main()
        {
            try
            {
                var filesToDownlad = new List<Tuple<string, string>>
                {
                    new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
                    new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
                };
                _consolePosition = -1;
                Console.CursorVisible = false;

                Parallel.ForEach(filesToDownlad, new ParallelOptions { MaxDegreeOfParallelism = 4 }, doc =>
                {
                    DownloadFile(doc.Item2,doc.Item1).Wait();
                });
                Debug.WriteLine("ALL FILES DOWNLOADED");
                Console.CursorVisible = true;    
            }
            catch (Exception e)
            {
                Console.WriteLine(e);
                Console.ReadLine();
            }
        }

        private static readonly object ConsoleLock = new object();
        private static int _consolePosition;

        static readonly CancellationTokenSource source = new CancellationTokenSource();

        private static async Task DownloadFile(string url, string filename)
        {
            int currenctLineNumber = 0;
            int currectProgress = 0;

            try
            {
                lock (ConsoleLock)
                {
                    _consolePosition++;
                    currenctLineNumber = _consolePosition;
                }

                long fileSize = -1;

                IProgress<long> progress = new Progress<long>(value =>
                {
                    decimal tmp = (decimal)(value * 100) / fileSize;

                    if (tmp != currectProgress && tmp > currectProgress)
                    {
                        lock (ConsoleLock)
                        {
                            currectProgress = (int)tmp;
                            Console.CursorTop = currenctLineNumber;
                            Console.CursorLeft = 0;
                            Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, tmp, "DOWNLOADING");
                        }
                        Debug.WriteLine("{1} {0:N2}%", tmp, filename);
                    }
                });

                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.GetAsync(url, HttpCompletionOption.ResponseHeadersRead, source.Token))
                    {
                        response.EnsureSuccessStatusCode();
                        if (response.Content.Headers.ContentLength.HasValue) fileSize = response.Content.Headers.ContentLength.Value;

                        if (response.Content.Headers.ContentDisposition != null)
                        {
                            var tmp = response.Content.Headers.ContentDisposition.FileName.Replace("\"", "");
                            Debug.WriteLine("Real name: {0}",tmp);
                        }

                        using (Stream streamToReadFrom = await response.Content.ReadAsStreamAsync())
                        {
                            using (Stream streamToWriteTo = File.Open(Path.Combine(LocalPath, filename), FileMode.Create, FileAccess.Write))
                            {
                                await streamToReadFrom.CopyToAsync(streamToWriteTo, progress, source.Token,0x2000);

                                Debug.WriteLine(filename+" downloaded");

                                lock (ConsoleLock)
                                {
                                    Console.CursorTop = currenctLineNumber;
                                    Console.CursorLeft = 0;
                                    var oldColor = Console.ForegroundColor;
                                    Console.ForegroundColor = ConsoleColor.Green;
                                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, 100, "SUCCESS");
                                    Console.ForegroundColor = oldColor;
                                }
                            }
                        }
                    }
                }
            }
            catch (Exception e)
            {
                Debug.WriteLine(e.Message);
                lock (ConsoleLock)
                {
                    Console.CursorTop = currenctLineNumber;
                    Console.CursorLeft = 0;
                    var oldColor = Console.ForegroundColor;
                    Console.ForegroundColor = ConsoleColor.Red;
                    Console.Write("{0,10} - {2,11} - {1,6:N2}%", filename, currectProgress, "ERROR");
                    Console.ForegroundColor = oldColor;
                }
            }
        }
    }

    public static class StreamExtensions
    {
        public static async Task CopyToAsync(this Stream source, Stream destination, IProgress<long> progress, CancellationToken cancellationToken = default(CancellationToken), int bufferSize = 0x1000)
        {
            var buffer = new byte[bufferSize];
            int bytesRead;
            long totalRead = 0;
            while ((bytesRead = await source.ReadAsync(buffer, 0, buffer.Length, cancellationToken)) > 0)
            {
                await destination.WriteAsync(buffer, 0, bytesRead, cancellationToken);
                cancellationToken.ThrowIfCancellationRequested();
                totalRead += bytesRead;
                Thread.Sleep(10);
                progress.Report(totalRead);
            }
        }
    }
}
like image 311
Misiu Avatar asked Sep 28 '16 08:09

Misiu


2 Answers

Your problem is actually here:

new Progress<long>

The Progress<T> class always invokes its callbacks in a SynchronizationContext - which in this case is the thread pool SynchronizationContext. This means that when the progress reporting code calls Report, it's just queueing the callback to the thread pool. So, it's possible to see them out of order (or still coming in a bit after the download has actually finished).

To fix this, you can create your own custom implementation of IProgress<T>:

//C#6.0
public sealed class SynchronousProgress<T> : IProgress<T>
{
  private readonly Action<T> _callback;
  public SynchronousProgress(Action<T> callback) { _callback = callback; }
  void IProgress<T>.Report(T data) => _callback(data);
}
//older version
public sealed class SynchronousProgress<T> : IProgress<T>
{
    private readonly Action<T> _callback;

    public SynchronousProgress(Action<T> callback)
    {
        _callback = callback;
    }

    void IProgress<T>.Report(T data)
    {
        _callback(data);
    }
}

Then replace the line

IProgress<long> progress = new Progress<long>(value =>

with

IProgress<long> progress = new SynchronousProgress<long>(value =>
like image 108
Stephen Cleary Avatar answered Sep 20 '22 15:09

Stephen Cleary


The OP requested I show how to do his program with TPL Dataflow in the comments. It is actually a rather simple conversion. First add a reference to the NuGet package System.Threading.Tasks.Dataflow. Then just change your main function to

static void Main()
{
    try
    {
        var filesToDownlad = new List<Tuple<string, string>>
        {
            new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
            new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
        };
        _consolePosition = -1;
        Console.CursorVisible = false;

        var downloadBlock = new ActionBlock<Tuple<string, string>>(doc => DownloadFile(doc.Item2, doc.Item1),
                                                                   new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 4});

        foreach (var file in filesToDownlad)
        {
            downloadBlock.Post(file);
        }
        downloadBlock.Complete();
        downloadBlock.Completion.Wait();


        Debug.WriteLine("ALL FILES DOWNLOADED");
        Console.CursorVisible = true;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        Console.ReadLine();
    }
}

If you where doing this with a program with a synchronization context and you wanted to await the completion and posting instead of doing synchronous operations you could do

static async Task Example()
{
    try
    {
        var filesToDownlad = new List<Tuple<string, string>>
        {
            new Tuple<string, string>("file1.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip"),
            new Tuple<string, string>("file2.tmp", "http://ipv4.download.thinkbroadband.com/10MB.zip")
        };
        _consolePosition = -1;
        Console.CursorVisible = false;

        var downloadBlock = new ActionBlock<Tuple<string, string>>(doc => DownloadFile(doc.Item2, doc.Item1),
                                                                   new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 4});

        foreach (var file in filesToDownlad)
        {
            await downloadBlock.SendAsync(file);
        }
        downloadBlock.Complete();
        await downloadBlock.Completion;


        Debug.WriteLine("ALL FILES DOWNLOADED");
        Console.CursorVisible = true;
    }
    catch (Exception e)
    {
        Console.WriteLine(e);
        Console.ReadLine();
    }
}

Note, this does not fix your problem of "ALL FILES DOWNLOADED" running early. You need to use Stephen's solution to fix that. All this does is fix a potential deadlock if this code was running in a situation where there could be a SynchronizationContext on the calling thread.

like image 32
Scott Chamberlain Avatar answered Sep 22 '22 15:09

Scott Chamberlain