Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Dart: how to manage concurrency in async function

I really like the async/await pattern in Dart. It allows me to write readable methods.

But, there are a couple of things that are problematic, one in particular, I don't know hot to manage at all.

The problem is that with async and multiple await inside a method, we introduce concurrency in the method. For example If I have a method:

Future<int> foo(int value) async {
await foo2();
await foo3();
await foo4();
int ret = foo5(value);
return ret;
}

Well, this is a really simple example. The problem here is that, for every await, the method is put in the event loop. That is OK, when you understand it, but this does not prevent your application from calling again the method befor it has retuned a value.

Consider if the method is managing data that is common to the instance of the class and not to the method itself.

So, I have tried the following solution:

bool isWorking = false;

Future<int> foo(int value) async {
if (isWorking) return foo(value);
isWorking = true;

await foo2();
await foo3();
await foo4();
int ret = foo5(value);

isWorking = False;

return ret;
}

As far as I have understood, calling a future method put it immediately in the event loop, so I thought that the execution of the concurrent call of the method was delayed until the first one was ended. But it is not like that, the program enters in an endless loop.

Anywone can give me an explanation and a solution to this question?

Edit: in general I think that it could be interesting to have, like in other languages, a synchronized keyword, with the meaning that the method, if called a second time will wait until the first has ended. Something like:

Future<int> foo(int value) async synchronized {

Edit 2:

I'm really excited because I think I got the solution for this problem that I had for a long time. Thanks to Argenti and in particular to Alexandre that give me the solution. I have simply restructured the solution for easy reuse (at least for me) and I post it here the class I have created and an example on how to use it for those who could need it (try at your own risk ;-) ). I have used a mixin because I find it practical, but you can use the Locker class alone if you like.

myClass extends Object with LockManager {

  Locker locker = LockManager.getLocker();

  Future<int> foo(int value) async {

   _recall() {
      return foo(value);
   } 

   if (locker.locked) {
     return await locker.waitLock();
   }
   locker.setFunction(_recall);
   locker.lock();

   await foo2();
   await foo3();
   await foo4();
   int ret = foo5(value);

   locker.unlock();

   return ret;
  }
}

The class is:

import 'dart:async';

class LockManager {

static Locker getLocker() => new Locker();

}

class Locker {

  Future<Null> _isWorking = null;
  Completer<Null> completer;
  Function _function;
  bool get locked => _isWorking != null;

  lock() {
    completer = new Completer();
    _isWorking = completer.future;
  }

  unlock() {
    completer.complete();
    _isWorking = null;
  }

  waitLock() async {
      await _isWorking;
      return _function();
  }

  setFunction(Function fun) {
    if (_function == null) _function = fun;
  }

}

I have structured the code this way so that you can use it easily in more than one method inside your classes. In this case you need a Locker instance per method. I hope that it can be useful.

like image 615
J F Avatar asked Feb 06 '17 15:02

J F


People also ask

Does Dart support concurrency?

Dart supports concurrent programming with async-await, isolates, and classes such as Future and Stream .

Is async await concurrent?

It is concurrent, in the sense that many outstanding asychronous operations may be in progress at any time. It may or may not be multithreaded. By default, await will schedule the continuation back to the "current execution context".

Does Dart support multithreading?

In a programming language like Java, developers can create multiple threads and share the same memory. Dart allows us to create multiple Isolates what is similar to multithreading, but it's not.

How do you use async in Dart?

async: You can use the async keyword before a function's body to mark it as asynchronous. async function: An async function is a function labeled with the async keyword. await: You can use the await keyword to get the completed result of an asynchronous expression. The await keyword only works within an async function.


4 Answers

Instead of a boolean you can use a Future and a Completer to achieve what you want:

Future<Null> isWorking = null;

Future<int> foo(int value) async {
  if (isWorking != null) {
    await isWorking; // wait for future complete
    return foo(value);
  }

  // lock
  var completer = new Completer<Null>();
  isWorking = completer.future;

  await foo2();
  await foo3();
  await foo4();
  int ret = foo5(value);

  // unlock
  completer.complete();
  isWorking = null;

  return ret;
}

The first time the method is call isWorking is null, doesn't enter the if section and create isWorking as a Future that will be complete at the end of the method. If an other call is done to foo before the first call has complete the Future isWorking, this call enter the if section and it waits for the Future isWorking to complete. This is the same for all calls that could be done before the completion of the first call. Once the first call has complete (and isWorking is set to null) the awaiting calls are notified they will call again foo. One of them will be entering foo as the first call and the same workflow will be done.

See https://dartpad.dartlang.org/dceafcb4e6349acf770b67c0e816e9a7 to better see the workflow.

like image 112
Alexandre Ardhuin Avatar answered Oct 22 '22 11:10

Alexandre Ardhuin


The answers are fine, here's just one more implementation of a "mutex" that prevents async operations from being interleaved.

class AsyncMutex {
  Future _next = new Future.value(null);
  /// Request [operation] to be run exclusively.
  ///
  /// Waits for all previously requested operations to complete,
  /// then runs the operation and completes the returned future with the
  /// result.
  Future<T> run<T>(Future<T> operation()) {
    var completer = new Completer<T>();
    _next.whenComplete(() {
      completer.complete(new Future<T>.sync(operation));
    });
    return _next = completer.future;
  }
}

It doesn't have many features, but it's short and hopefully understandable.

like image 38
lrn Avatar answered Oct 22 '22 13:10

lrn


It is now 2021, we can use synchronized 3.0.0

var lock = new Lock();
Future<int> foo(int value) async {
    int ret;
    await lock.synchronized(() async {
        await foo2();
        await foo3();
        await foo4();
        ret = foo5(value);
    }
    return ret;
}
like image 24
s k Avatar answered Oct 22 '22 11:10

s k


I guess what is really needed is a Dart library that implements concurrency management primitives such as locks, mutexs and semaphores.

I recently used the Pool package to effectively implement a mutex to prevent 'concurrent' access to resource. (This was 'throw away' code, so please don't take it as a high quality solution.)

Simplifying the example slightly:

final Pool pool = new Pool(1, timeout: new Duration(...));

Future<Null> foo(thing, ...) async {
  PoolResource rp = await pool.request();

  await foo1();
  await foo2();
  ...

  rp.release();
}

Requesting the resource from the pool before calling the async functions inside foo() ensures that when multiple concurrent calls too foo() are made, calls to foo1() and foo2() don't get 'interleaved' improperly.

Edit:

There appear to be several packages that address providing mutexes: https://www.google.com/search?q=dart+pub+mutex.

like image 29
Argenti Apparatus Avatar answered Oct 22 '22 12:10

Argenti Apparatus