I am trying to write an HTTP server in Dart that can handle multiple requests in parallel. I have been unsuccessful at achieving the "parallel" part thus far.
Here is what I tried at first:
import 'dart:io';
main() {
HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
server.listen((HttpRequest request) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
request.response.statusCode = HttpStatus.OK;
request.response.write(stopwatch.elapsedMilliseconds.toString());
request.response.close().catchError(print);
});
});
}
On each request it does busy work for one second, then completes. I made it handle requests this way so that its timing would be predictable, and so I could easily see the effect of a request in Windows task manager (a CPU core jumping to 100% usage).
I can tell this is not handling requests in parallel because:
If I load up several browser tabs to http://example:8080/
and then refresh them all, the tabs load one after another in sequence, approximately 1 second between each.
If I use the load-testing tool wrk with these settings... wrk -d 10 -c 8 -t 8 http://example:8080/
...it completes 5 to 8 requests in the 10 seconds I gave it. If the server was using all my 8 cores, I'd expect a number closer to 80 requests.
When I open the Windows task manager during the wrk test, I observe that only one of my cores is near 100% usage, and the rest are pretty much idle.
So, then I tried using isolates, hoping to manually spawn a new isolate/thread for each request:
import 'dart:io';
import 'dart:isolate';
main() {
HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
server.listen((HttpRequest request) {
spawnFunction(handleRequest).send(request);
});
});
}
handleRequest() {
port.receive((HttpRequest request, SendPort sender) {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
request.response.statusCode = HttpStatus.OK;
request.response.write(stopwatch.elapsedMilliseconds.toString());
request.response.close().catchError(print);
});
}
This does not work at all. It doesn't like that I'm trying to send an HttpRequest as the message to the isolate. Here is the error:
#0 _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1 _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2 _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3 main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9 StreamController.add (dart:async/stream_controller.dart:10:35)
#10 _HttpServer._handleRequest (http_impl.dart:1261:20)
#11 _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17 StreamController.add (dart:async/stream_controller.dart:10:35)
#18 _HttpParser._doParse (http_parser.dart:415:26)
#19 _HttpParser._parse (http_parser.dart:161:15)
#20 _HttpParser._onData._onData (http_parser.dart:509:11)
#21 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26 StreamController.add (dart:async/stream_controller.dart:10:35)
#27 _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28 _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29 _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30 _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31 _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32 _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33 StreamController.add (dart:async/stream_controller.dart:10:35)
#34 _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35 _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36 _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0 _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1 _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2 _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3 Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4 Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5 Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Versions used:
Is it possible to handle these requests in parallel, with all my machine's available cores, using Dart?
Even with the current HttpServer limitations it is possible to utilise multiple cores by running multiple server processes behind a reverse proxy server like Apache or Nginx. From within Dart you can also fork child processes to split out compute intensive tasks.
A good place to start would be to read about scaling node.js, as this also uses a single thread per-process architecture.
Edit: The answer is now out of date, it is now possible to share requests between isolates allowing a Dart process to use multiple cores.
See the docs for ServerSocket.bind(shared).
"The optional argument shared specify whether additional binds to the same address, port and v6Only combination is possible from the same Dart process. If shared is true and additional binds are performed, then the incoming connections will be distributed between that set of ServerSockets. One way of using this is to have number of isolates between which incoming connections are distributed."
I wrote a library called dart-isoserver to do this a while back. It's severely bit rotted now, but you can see the approach.
https://code.google.com/p/dart-isoserver/
What I did was proxy HttpRequest and HttpResponse via isolate ports, since you cannot send them directly. It worked, though there were a few caveats:
spawnFunction()
now has an uncaught exception handler parameter, so this is somewhat fixable, but spawnUri() doesn't. dart-isoserver used spawnUri() to implement hot-loading, so that would have to be removed.A note about your first code example. That definitely won't run in parallel, as you noticed, because Dart is single-threaded. No Dart code in the same isolate ever runs concurrently.
You need to:
shared: true
in HttpServer.bindHere's a barebones, minimal Dart server that distributes the incoming requests across 6 Isolates:
import 'dart:io';
import 'dart:isolate';
void main() async {
for (var i = 1; i < 6; i++) {
Isolate.spawn(_startServer, []);
}
// Bind one server in current Isolate
_startServer();
print('Serving at http://localhost:8080/');
await ProcessSignal.sigterm.watch().first;
}
void _startServer([List args]) async {
final server = await HttpServer.bind(
InternetAddress.loopbackIPv4,
8080,
shared: true, // This is the magic sauce
);
await for (final request in server) {
_handleRequest(request);
}
}
void _handleRequest(HttpRequest request) async {
// Fake delay
await Future.delayed(const Duration(seconds: 2));
request.response.write('Hello, world!');
await request.response.close();
}
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With