Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

how to get file parallel using HttpWebRequest

I'm trying to make a program like IDM, that can download parts of the file simultaneously.
The tool i'm using to achieve this is TPL in C# .Net4.5
But I'm having a problem when using Tasks to make the operation parallel.
The sequence function is functioning well and it is downloading the files correctly.
The parallel function using Tasks is working until something weird happens:
I've created 4 tasks, with Factory.StartNew(), in each task the start position and the end position are given, the task will download these files, then it'll return it in byte[], and everything is going well, the tasks are working fine, but at some point, the executing freezes and that's it, the program stops and nothing else happens.
the implementation of the parallel function:

static void DownloadPartsParallel()
    {

        string uriPath = "http://mschnlnine.vo.llnwd.net/d1/pdc08/PPTX/BB01.pptx";
        Uri uri = new Uri(uriPath);
        long l = GetFileSize(uri);
        Console.WriteLine("Size={0}", l);
        int granularity = 4;
        byte[][] arr = new byte[granularity][];
        Task<byte[]>[] tasks = new Task<byte[]>[granularity];
        tasks[0] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, 0, l / granularity));
        tasks[1] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + 1, l / granularity + l / granularity));
        tasks[2] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + l / granularity + 1, l / granularity + l / granularity + l / granularity));
        tasks[3] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + l / granularity + l / granularity + 1, l));//(l / granularity) + (l / granularity) + (l / granularity) + (l / granularity)


        arr[0] = tasks[0].Result;
        arr[1] = tasks[1].Result;
        arr[2] = tasks[2].Result;
        arr[3] = tasks[3].Result;
        Stream localStream;
        localStream = File.Create("E:\\a\\" + Path.GetFileName(uri.LocalPath));
        for (int i = 0; i < granularity; i++)
        {

            if (i == granularity - 1)
            {
                for (int j = 0; j < arr[i].Length - 1; j++)
                {
                    localStream.WriteByte(arr[i][j]);
                }
            }
            else
                for (int j = 0; j < arr[i].Length; j++)
                {
                    localStream.WriteByte(arr[i][j]);
                }
        }
    }

the DownloadPartOfFile function implementation:

public static byte[] DownloadPartOfFile(Uri fileUrl, long from, long to)
    {
        int bytesProcessed = 0;
        BinaryReader reader = null;
        WebResponse response = null;
        byte[] bytes = new byte[(to - from) + 1];

        try
        {
            HttpWebRequest request = (HttpWebRequest)WebRequest.Create(fileUrl);
            request.AddRange(from, to);
            request.ReadWriteTimeout = int.MaxValue;
            request.Timeout = int.MaxValue;
            if (request != null)
            {
                response = request.GetResponse();
                if (response != null)
                {
                    reader = new BinaryReader(response.GetResponseStream());
                    int bytesRead;
                    do
                    {
                        byte[] buffer = new byte[1024];
                        bytesRead = reader.Read(buffer, 0, buffer.Length);
                        if (bytesRead == 0)
                        {
                            break;
                        }
                        Array.Resize<byte>(ref buffer, bytesRead);
                        buffer.CopyTo(bytes, bytesProcessed);
                        bytesProcessed += bytesRead;
                        Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ",Downloading" + bytesProcessed);
                    } while (bytesRead > 0);
                }
            }
        }
        catch (Exception e)
        {
            Console.WriteLine(e.Message);
        }
        finally
        {
            if (response != null) response.Close();
            if (reader != null) reader.Close();
        }

        return bytes;
    }

I tried to solve it by setting int.MaxValue to the reading timeout, writing reading timeout, and timeout, that's why the program freezes, if i didn't do that, an exception of timeout will occur while in function DownloadPartsParallel
so is there a solution, or any other advice that may help, thanks.

like image 904
Tarek Avatar asked Feb 12 '14 19:02

Tarek


2 Answers

I would use HttpClient.SendAsync rather than WebRequest (see "HttpClient is Here!").

I would not use any extra threads. The HttpClient.SendAsync API is naturally asynchronous and returns an awaitable Task<>, there is no need to offload it to a pool thread with Task.Run/Task.TaskFactory.StartNew (see this for a detailed discussion).

I would also limit the number of parallel downloads with SemaphoreSlim.WaitAsync(). Below is my take as a console app (not extensively tested):

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21737681
{
    class Program
    {
        const int MAX_PARALLEL = 4; // max parallel downloads
        const int CHUNK_SIZE = 2048; // size of a single chunk

        // a chunk of downloaded data
        class Chunk
        {
            public long Start { get; set; }
            public int Length { get; set; }
            public byte[] Data { get; set; }
        };

        // throttle downloads
        SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(MAX_PARALLEL);

        // get a chunk
        async Task<Chunk> GetChunk(HttpClient client, long start, int length, string url)
        {
            await _throttleSemaphore.WaitAsync();
            try
            {
                using (var request = new HttpRequestMessage(HttpMethod.Get, url))
                {
                    request.Headers.Range = new System.Net.Http.Headers.RangeHeaderValue(start, start + length - 1);
                    using (var response = await client.SendAsync(request))
                    {
                        var data = await response.Content.ReadAsByteArrayAsync();
                        return new Chunk { Start = start, Length = length/*, Data = data*/ };
                    }
                }
            }
            finally
            {
                _throttleSemaphore.Release();
            }
        }

        // download the URL in parallel by chunks
        async Task<Chunk[]> DownloadAsync(string url)
        {
            using (var client = new HttpClient())
            {
                var request = new HttpRequestMessage(HttpMethod.Head, url);
                var response = await client.SendAsync(request);
                var contentLength = response.Content.Headers.ContentLength;

                if (!contentLength.HasValue)
                    throw new InvalidOperationException("ContentLength");

                var numOfChunks = (int)((contentLength.Value + CHUNK_SIZE - 1) / CHUNK_SIZE);

                var tasks = Enumerable.Range(0, numOfChunks).Select(i =>
                {
                    // start a new chunk
                    long start = i * CHUNK_SIZE;
                    var length = (int)Math.Min(CHUNK_SIZE, contentLength.Value - start);
                    return GetChunk(client, start, length, url);
                }).ToList();

                await Task.WhenAll(tasks);

                // the order of chunks is random
                return tasks.Select(task => task.Result).ToArray();
            }
        }

        static void Main(string[] args)
        {
            var program = new Program();
            var chunks = program.DownloadAsync("http://flaglane.com/download/australian-flag/australian-flag-large.png").Result;

            Console.WriteLine("Chunks: " + chunks.Count());
            Console.ReadLine();
        }
    }
}
like image 155
noseratio Avatar answered Oct 25 '22 06:10

noseratio


OK, here's how I would do what you're attempting. This is basically the same idea, just implemented differently.

public static void DownloadFileInPiecesAndSave()
{
    //test
    var uri = new Uri("http://www.w3.org/");

    var bytes = DownloadInPieces(uri, 4);
    File.WriteAllBytes(@"c:\temp\RangeDownloadSample.html", bytes);
}

/// <summary>
/// Donwload a file via HTTP in multiple pieces using a Range request.
/// </summary>
public static byte[] DownloadInPieces(Uri uri, uint numberOfPieces)
{
    //I'm just fudging this for expository purposes. In reality you would probably want to do a HEAD request to get total file size.
    ulong totalFileSize = 1003; 

    var pieceSize = totalFileSize / numberOfPieces;

    List<Task<byte[]>> tasks = new List<Task<byte[]>>();
    for (uint i = 0; i < numberOfPieces; i++)
    {
        var start = i * pieceSize;
        var end = start + (i == numberOfPieces - 1 ? pieceSize + totalFileSize % numberOfPieces : pieceSize);
        tasks.Add(DownloadFilePiece(uri, start, end));
    }

    Task.WaitAll(tasks.ToArray());

    //This is probably not the single most efficient way to combine byte arrays, but it is succinct...
    return tasks.SelectMany(t => t.Result).ToArray();
}

private static async Task<byte[]> DownloadFilePiece(Uri uri, ulong rangeStart, ulong rangeEnd)
{
    try
    {
        var request = (HttpWebRequest)WebRequest.Create(uri);
        request.AddRange((long)rangeStart, (long)rangeEnd);
        request.Proxy = WebProxy.GetDefaultProxy();

        using (var response = await request.GetResponseAsync())
        using (var responseStream = response.GetResponseStream())
        using (var memoryStream = new MemoryStream((int)(rangeEnd - rangeStart)))
        {
            await responseStream.CopyToAsync(memoryStream);
            return memoryStream.ToArray();
        }
    }
    catch (WebException wex)
    {
        //Do lots of error handling here, lots of things can go wrong
        //In particular watch for 416 Requested Range Not Satisfiable
        return null;
    }
    catch (Exception ex)
    {
        //handle the unexpected here...
        return null;
    }
}

Note that I glossed over a lot of stuff here, such as:

  • Detecting if the server supports range requests. If it doesn't then the server will return the entire content in each request, and we'll get several copies of it.
  • Handling any sort of HTTP errors. What if the third request fails?
  • Retry logic
  • Timeouts
  • Figuring out how big the file actually is
  • Checking whether the file is big enough to warrant multiple requests, and if so how many? It's probably not worth doing this in parallel for files under 1 or 2 MB, but you'd have to test
  • Most likely a bunch of other stuff.

So you've got a long way to go before I would use this in production. But it should give you an idea of where to start.

like image 40
Brian Reischl Avatar answered Oct 25 '22 08:10

Brian Reischl