I'm trying to make a program like IDM, that can download parts of the file simultaneously.
The tool i'm using to achieve this is TPL in C# .Net4.5
But I'm having a problem when using Tasks
to make the operation parallel.
The sequence function is functioning well and it is downloading the files correctly.
The parallel function using Tasks is working until something weird happens:
I've created 4 tasks, with Factory.StartNew()
, in each task the start position and the end position are given, the task will download these files, then it'll return it in byte[], and everything is going well, the tasks are working fine, but at some point, the executing freezes and that's it, the program stops and nothing else happens.
the implementation of the parallel function:
static void DownloadPartsParallel()
{
string uriPath = "http://mschnlnine.vo.llnwd.net/d1/pdc08/PPTX/BB01.pptx";
Uri uri = new Uri(uriPath);
long l = GetFileSize(uri);
Console.WriteLine("Size={0}", l);
int granularity = 4;
byte[][] arr = new byte[granularity][];
Task<byte[]>[] tasks = new Task<byte[]>[granularity];
tasks[0] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, 0, l / granularity));
tasks[1] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + 1, l / granularity + l / granularity));
tasks[2] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + l / granularity + 1, l / granularity + l / granularity + l / granularity));
tasks[3] = Task<byte[]>.Factory.StartNew(() => DownloadPartOfFile(uri, l / granularity + l / granularity + l / granularity + 1, l));//(l / granularity) + (l / granularity) + (l / granularity) + (l / granularity)
arr[0] = tasks[0].Result;
arr[1] = tasks[1].Result;
arr[2] = tasks[2].Result;
arr[3] = tasks[3].Result;
Stream localStream;
localStream = File.Create("E:\\a\\" + Path.GetFileName(uri.LocalPath));
for (int i = 0; i < granularity; i++)
{
if (i == granularity - 1)
{
for (int j = 0; j < arr[i].Length - 1; j++)
{
localStream.WriteByte(arr[i][j]);
}
}
else
for (int j = 0; j < arr[i].Length; j++)
{
localStream.WriteByte(arr[i][j]);
}
}
}
the DownloadPartOfFile function implementation:
public static byte[] DownloadPartOfFile(Uri fileUrl, long from, long to)
{
int bytesProcessed = 0;
BinaryReader reader = null;
WebResponse response = null;
byte[] bytes = new byte[(to - from) + 1];
try
{
HttpWebRequest request = (HttpWebRequest)WebRequest.Create(fileUrl);
request.AddRange(from, to);
request.ReadWriteTimeout = int.MaxValue;
request.Timeout = int.MaxValue;
if (request != null)
{
response = request.GetResponse();
if (response != null)
{
reader = new BinaryReader(response.GetResponseStream());
int bytesRead;
do
{
byte[] buffer = new byte[1024];
bytesRead = reader.Read(buffer, 0, buffer.Length);
if (bytesRead == 0)
{
break;
}
Array.Resize<byte>(ref buffer, bytesRead);
buffer.CopyTo(bytes, bytesProcessed);
bytesProcessed += bytesRead;
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + ",Downloading" + bytesProcessed);
} while (bytesRead > 0);
}
}
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
finally
{
if (response != null) response.Close();
if (reader != null) reader.Close();
}
return bytes;
}
I tried to solve it by setting int.MaxValue to the reading timeout, writing reading timeout, and timeout, that's why the program freezes, if i didn't do that, an exception of timeout will occur while in function DownloadPartsParallel
so is there a solution, or any other advice that may help, thanks.
I would use HttpClient.SendAsync
rather than WebRequest
(see "HttpClient is Here!").
I would not use any extra threads. The HttpClient.SendAsync
API is naturally asynchronous and returns an awaitable Task<>
, there is no need to offload it to a pool thread with Task.Run
/Task.TaskFactory.StartNew
(see this for a detailed discussion).
I would also limit the number of parallel downloads with SemaphoreSlim.WaitAsync()
. Below is my take as a console app (not extensively tested):
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21737681
{
class Program
{
const int MAX_PARALLEL = 4; // max parallel downloads
const int CHUNK_SIZE = 2048; // size of a single chunk
// a chunk of downloaded data
class Chunk
{
public long Start { get; set; }
public int Length { get; set; }
public byte[] Data { get; set; }
};
// throttle downloads
SemaphoreSlim _throttleSemaphore = new SemaphoreSlim(MAX_PARALLEL);
// get a chunk
async Task<Chunk> GetChunk(HttpClient client, long start, int length, string url)
{
await _throttleSemaphore.WaitAsync();
try
{
using (var request = new HttpRequestMessage(HttpMethod.Get, url))
{
request.Headers.Range = new System.Net.Http.Headers.RangeHeaderValue(start, start + length - 1);
using (var response = await client.SendAsync(request))
{
var data = await response.Content.ReadAsByteArrayAsync();
return new Chunk { Start = start, Length = length/*, Data = data*/ };
}
}
}
finally
{
_throttleSemaphore.Release();
}
}
// download the URL in parallel by chunks
async Task<Chunk[]> DownloadAsync(string url)
{
using (var client = new HttpClient())
{
var request = new HttpRequestMessage(HttpMethod.Head, url);
var response = await client.SendAsync(request);
var contentLength = response.Content.Headers.ContentLength;
if (!contentLength.HasValue)
throw new InvalidOperationException("ContentLength");
var numOfChunks = (int)((contentLength.Value + CHUNK_SIZE - 1) / CHUNK_SIZE);
var tasks = Enumerable.Range(0, numOfChunks).Select(i =>
{
// start a new chunk
long start = i * CHUNK_SIZE;
var length = (int)Math.Min(CHUNK_SIZE, contentLength.Value - start);
return GetChunk(client, start, length, url);
}).ToList();
await Task.WhenAll(tasks);
// the order of chunks is random
return tasks.Select(task => task.Result).ToArray();
}
}
static void Main(string[] args)
{
var program = new Program();
var chunks = program.DownloadAsync("http://flaglane.com/download/australian-flag/australian-flag-large.png").Result;
Console.WriteLine("Chunks: " + chunks.Count());
Console.ReadLine();
}
}
}
OK, here's how I would do what you're attempting. This is basically the same idea, just implemented differently.
public static void DownloadFileInPiecesAndSave()
{
//test
var uri = new Uri("http://www.w3.org/");
var bytes = DownloadInPieces(uri, 4);
File.WriteAllBytes(@"c:\temp\RangeDownloadSample.html", bytes);
}
/// <summary>
/// Donwload a file via HTTP in multiple pieces using a Range request.
/// </summary>
public static byte[] DownloadInPieces(Uri uri, uint numberOfPieces)
{
//I'm just fudging this for expository purposes. In reality you would probably want to do a HEAD request to get total file size.
ulong totalFileSize = 1003;
var pieceSize = totalFileSize / numberOfPieces;
List<Task<byte[]>> tasks = new List<Task<byte[]>>();
for (uint i = 0; i < numberOfPieces; i++)
{
var start = i * pieceSize;
var end = start + (i == numberOfPieces - 1 ? pieceSize + totalFileSize % numberOfPieces : pieceSize);
tasks.Add(DownloadFilePiece(uri, start, end));
}
Task.WaitAll(tasks.ToArray());
//This is probably not the single most efficient way to combine byte arrays, but it is succinct...
return tasks.SelectMany(t => t.Result).ToArray();
}
private static async Task<byte[]> DownloadFilePiece(Uri uri, ulong rangeStart, ulong rangeEnd)
{
try
{
var request = (HttpWebRequest)WebRequest.Create(uri);
request.AddRange((long)rangeStart, (long)rangeEnd);
request.Proxy = WebProxy.GetDefaultProxy();
using (var response = await request.GetResponseAsync())
using (var responseStream = response.GetResponseStream())
using (var memoryStream = new MemoryStream((int)(rangeEnd - rangeStart)))
{
await responseStream.CopyToAsync(memoryStream);
return memoryStream.ToArray();
}
}
catch (WebException wex)
{
//Do lots of error handling here, lots of things can go wrong
//In particular watch for 416 Requested Range Not Satisfiable
return null;
}
catch (Exception ex)
{
//handle the unexpected here...
return null;
}
}
Note that I glossed over a lot of stuff here, such as:
So you've got a long way to go before I would use this in production. But it should give you an idea of where to start.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With