Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dart: handle incoming HTTP requests in parallel

I am trying to write an HTTP server in Dart that can handle multiple requests in parallel. I have been unsuccessful at achieving the "parallel" part thus far.

Here is what I tried at first:

import 'dart:io';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
      request.response.statusCode = HttpStatus.OK;
      request.response.write(stopwatch.elapsedMilliseconds.toString());
      request.response.close().catchError(print);
    });
  });
}

On each request it does busy work for one second, then completes. I made it handle requests this way so that its timing would be predictable, and so I could easily see the effect of a request in Windows task manager (a CPU core jumping to 100% usage).

I can tell this is not handling requests in parallel because:

  1. If I load up several browser tabs to http://example:8080/ and then refresh them all, the tabs load one after another in sequence, approximately 1 second between each.

  2. If I use the load-testing tool wrk with these settings... wrk -d 10 -c 8 -t 8 http://example:8080/ ...it completes 5 to 8 requests in the 10 seconds I gave it. If the server was using all my 8 cores, I'd expect a number closer to 80 requests.

  3. When I open the Windows task manager during the wrk test, I observe that only one of my cores is near 100% usage, and the rest are pretty much idle.

So, then I tried using isolates, hoping to manually spawn a new isolate/thread for each request:

import 'dart:io';
import 'dart:isolate';

main() {
  HttpServer.bind(InternetAddress.ANY_IP_V4, 8080).then((HttpServer server) {
    server.listen((HttpRequest request) {
      spawnFunction(handleRequest).send(request);
    });
  });
}

handleRequest() {
  port.receive((HttpRequest request, SendPort sender) {
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.start();
    while (stopwatch.elapsedMilliseconds < 1000) { /* do nothing */ }
    request.response.statusCode = HttpStatus.OK;
    request.response.write(stopwatch.elapsedMilliseconds.toString());
    request.response.close().catchError(print);
  });
}

This does not work at all. It doesn't like that I'm trying to send an HttpRequest as the message to the isolate. Here is the error:

#0      _SendPortImpl._sendInternal (dart:isolate-patch/isolate_patch.dart:122:3)
#1      _SendPortImpl._sendNow (dart:isolate-patch/isolate_patch.dart:95:18)
#2      _SendPortImpl.send (dart:isolate-patch/isolate_patch.dart:91:18)
#3      main.<anonymous closure>.<anonymous closure> (file:///C:/Development/dartbenchmark/simple2.dart:7:40)
#4      _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#5      _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#6      _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#7      _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#8      _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#9      StreamController.add (dart:async/stream_controller.dart:10:35)
#10     _HttpServer._handleRequest (http_impl.dart:1261:20)
#11     _HttpConnection._HttpConnection.<anonymous closure> (http_impl.dart:1188:33)
#12     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#13     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#14     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#15     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#16     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#17     StreamController.add (dart:async/stream_controller.dart:10:35)
#18     _HttpParser._doParse (http_parser.dart:415:26)
#19     _HttpParser._parse (http_parser.dart:161:15)
#20     _HttpParser._onData._onData (http_parser.dart:509:11)
#21     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#22     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#23     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#24     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#25     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#26     StreamController.add (dart:async/stream_controller.dart:10:35)
#27     _Socket._onData._onData (dart:io-patch/socket_patch.dart:726:42)
#28     _StreamSubscriptionImpl._sendData (dart:async/stream_impl.dart:475:12)
#29     _StreamImpl._sendData.<anonymous closure> (dart:async/stream_impl.dart:251:29)
#30     _SingleStreamImpl._forEachSubscriber (dart:async/stream_impl.dart:335:11)
#31     _StreamImpl._sendData (dart:async/stream_impl.dart:249:23)
#32     _StreamImpl._add (dart:async/stream_impl.dart:51:16)
#33     StreamController.add (dart:async/stream_controller.dart:10:35)
#34     _RawSocket._RawSocket.<anonymous closure> (dart:io-patch/socket_patch.dart:452:52)
#35     _NativeSocket.multiplex (dart:io-patch/socket_patch.dart:253:18)
#36     _NativeSocket.connectToEventHandler.<anonymous closure> (dart:io-patch/socket_patch.dart:338:54)
#37     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Unhandled exception:
Illegal argument(s): Illegal argument in isolate message : (object is a closure)
#0      _throwDelayed.<anonymous closure> (dart:async/stream_impl.dart:22:5)
#1      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:15:17)
#2      _asyncRunCallback._asyncRunCallback (dart:async/event_loop.dart:25:9)
#3      Timer.run.<anonymous closure> (dart:async/timer.dart:17:21)
#4      Timer.run.<anonymous closure> (dart:async/timer.dart:25:13)
#5      Timer.Timer.<anonymous closure> (dart:async-patch/timer_patch.dart:9:15)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:99:28)
#7      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:107:7)
#8      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:115:23)
#9      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

Versions used:

  • Dart Editor version 0.5.9_r22879
  • Dart SDK version 0.5.9.0_r22879

Is it possible to handle these requests in parallel, with all my machine's available cores, using Dart?

like image 881
Michael Hixson Avatar asked May 23 '13 00:05

Michael Hixson


3 Answers

Even with the current HttpServer limitations it is possible to utilise multiple cores by running multiple server processes behind a reverse proxy server like Apache or Nginx. From within Dart you can also fork child processes to split out compute intensive tasks.

A good place to start would be to read about scaling node.js, as this also uses a single thread per-process architecture.

Edit: The answer is now out of date, it is now possible to share requests between isolates allowing a Dart process to use multiple cores.

See the docs for ServerSocket.bind(shared).

"The optional argument shared specify whether additional binds to the same address, port and v6Only combination is possible from the same Dart process. If shared is true and additional binds are performed, then the incoming connections will be distributed between that set of ServerSockets. One way of using this is to have number of isolates between which incoming connections are distributed."

like image 40
Greg Lowe Avatar answered Nov 17 '22 08:11

Greg Lowe


I wrote a library called dart-isoserver to do this a while back. It's severely bit rotted now, but you can see the approach.

https://code.google.com/p/dart-isoserver/

What I did was proxy HttpRequest and HttpResponse via isolate ports, since you cannot send them directly. It worked, though there were a few caveats:

  1. I/O on the request and response went though the main isolate, so that part was not parallel. Other work done in the worker isolate didn't block the main isolate though. What really should happen is that a socket connection should be transferrable between isolates.
  2. Exceptions in the isolate would bring down the whole server. spawnFunction() now has an uncaught exception handler parameter, so this is somewhat fixable, but spawnUri() doesn't. dart-isoserver used spawnUri() to implement hot-loading, so that would have to be removed.
  3. Isolates are a little slow to start up, and you probably don't want one per connection for the thousands of concurrent connection use cases that nginx and node.js target. An isolate pool with work queues would probably perform better, though that would eliminate the nice feature you could use blocking I/O in a worker.

A note about your first code example. That definitely won't run in parallel, as you noticed, because Dart is single-threaded. No Dart code in the same isolate ever runs concurrently.

like image 114
Justin Fagnani Avatar answered Nov 17 '22 08:11

Justin Fagnani


You need to:

  1. Set shared: true in HttpServer.bind
  2. Spawn some Isolates to handle the incoming requests in parallel.

Here's a barebones, minimal Dart server that distributes the incoming requests across 6 Isolates:

import 'dart:io';
import 'dart:isolate';

void main() async {
  for (var i = 1; i < 6; i++) {
    Isolate.spawn(_startServer, []);
  }

  // Bind one server in current Isolate
  _startServer();

  print('Serving at http://localhost:8080/');
  await ProcessSignal.sigterm.watch().first;
}

void _startServer([List args]) async {
  final server = await HttpServer.bind(
    InternetAddress.loopbackIPv4,
    8080,
    shared: true, // This is the magic sauce
  );

  await for (final request in server) {
    _handleRequest(request);
  }
}

void _handleRequest(HttpRequest request) async {
  // Fake delay
  await Future.delayed(const Duration(seconds: 2));

  request.response.write('Hello, world!');
  await request.response.close();
}
like image 6
Iiro Krankka Avatar answered Nov 17 '22 07:11

Iiro Krankka