My goal is to create a kind of web-crawler in dart. For this I want to maintain an task queue where the elements are stored that need to be crawled (e.g URLs). The elements are crawled within the crawl function which returns a List of more elements that need to be processed. Thus these elements are added to the queue. Example code:
import "dart:collection";
final queue = Queue<String>();
main() async{
queue
..add("...")
..add("...")
..add("...");
while (queue.isNotEmpty) {
results = await crawl(queue.removeFirst());
queue.addAll(results);
}
}
Future<List<String>> crawl(String x) async {
...
res = await http.get(x)
...
return results;
}
This code of coarse only processes one element at a time. However I want to have a pool of workers (for example 5) that take elements out of the queue and process them at the same time, and add the results back to the queue. Since the bottleneck is the HTTP Request I think a Future.wait() call with multiple workers could speed up the execution. However I dont want to overload the servers and thus I also want to limit the amount of workers.
Can this be realized with basic async primitives and semaphores? I would like to avoid isolates if possible in order to keep the solution as easy as possible.
I don't know if there are already a package there gives this functionality but since it is not that complicated to write you own logic I have made the following example:
import 'dart:async';
import 'dart:collection';
import 'dart:math';
class TaskRunner<A, B> {
final Queue<A> _input = Queue();
final StreamController<B> _streamController = StreamController();
final Future<B> Function(A) task;
final int maxConcurrentTasks;
int runningTasks = 0;
TaskRunner(this.task, {this.maxConcurrentTasks = 5});
Stream<B> get stream => _streamController.stream;
void add(A value) {
_input.add(value);
_startExecution();
}
void addAll(Iterable<A> iterable) {
_input.addAll(iterable);
_startExecution();
}
void _startExecution() {
if (runningTasks == maxConcurrentTasks || _input.isEmpty) {
return;
}
while (_input.isNotEmpty && runningTasks < maxConcurrentTasks) {
runningTasks++;
print('Concurrent workers: $runningTasks');
task(_input.removeFirst()).then((value) async {
_streamController.add(value);
while (_input.isNotEmpty) {
_streamController.add(await task(_input.removeFirst()));
}
runningTasks--;
print('Concurrent workers: $runningTasks');
});
}
}
}
Random _rnd = Random();
Future<List<String>> crawl(String x) =>
Future.delayed(Duration(seconds: _rnd.nextInt(5)), () => x.split('-'));
void main() {
final runner = TaskRunner(crawl, maxConcurrentTasks: 3);
runner.stream.forEach((listOfString) {
if (listOfString.length == 1) {
print('DONE: ${listOfString.first}');
} else {
print('PUTTING STRINGS ON QUEUE: $listOfString');
runner.addAll(listOfString);
}
});
runner.addAll(['1-2-3-4-5-6-7-8-9', '10-20-30-40-50-60-70-80-90']);
}
Which outputs:
Concurrent workers: 1
Concurrent workers: 2
Concurrent workers: 1
PUTTING STRINGS ON QUEUE: [1, 2, 3, 4, 5, 6, 7, 8, 9]
Concurrent workers: 2
Concurrent workers: 3
Concurrent workers: 4
PUTTING STRINGS ON QUEUE: [10, 20, 30, 40, 50, 60, 70, 80, 90]
DONE: 3
DONE: 5
DONE: 1
DONE: 2
DONE: 7
DONE: 4
DONE: 6
DONE: 10
DONE: 8
DONE: 9
DONE: 30
DONE: 20
DONE: 40
DONE: 50
Concurrent workers: 3
DONE: 90
Concurrent workers: 2
DONE: 60
Concurrent workers: 1
DONE: 80
Concurrent workers: 0
DONE: 70
I am sure the usability of the class can be improved but I think the core concept are easy enough to understand. The concepts are we defines a Queue and every time we add stuff to this Queue we checks if we can start executing new async tasks. Else we just skip it since we ensure that every current running async task will check for more content on the Queue before "closing down".
The results is returned by a Stream which you can subscribe on and e.g. add more content to the TaskRunner based on the result like I show in my example. The order the data is returned is based in the order they are finished.
It is important that this is NOT a way to run tasks in multiple threads. All the code are running in a single Dart isolate thread but because HTTP requests are IO delayed there are a point in trying to spawn multiple Future's and wait for the result.
Similar to the p-limit package in js. There is now a p_limit package in Dart. (Disclaimer, I am the author of the implementation of it in Dart). See the documentation
Just instantiate it with the argument being how many concurrent tasks you want, then make a list of Futures, each wrapped with the generated limit function below.
import 'package:p_limit/p_limit.dart';
void main() async {
// Example concurrency of 3 futures at once
final limit = PLimit<http.Response>(3);
final queue = Queue<String>();
queue
..add("http://www.exampleone.com/")
..add("http://www.exampletwo.com/")
..add("http://www.examplethree.com/")
..add("http://www.examplefour.com/");
final futures = queue.map((url) {
// wrap the function we are calling in the limit function we defined above
return limit(() => http.get(Uri.parse(url)));
});
// Only three futures are run at once (as defined above)
final results = await Future.wait(futures);
print(results);
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With