Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How to implement the equivalent of Promise.all for my Task implementation?

Here is my Task implementation (i.e. a sort of Promise but complying with the monad laws and cancelable). It works rock solid:

const Task = k =>
  ({runTask: (res, rej) => k(res, rej)});

const tAp = tf => tk =>
  Task((res, rej) => tf.runTask(f => tk.runTask(x => res(f(x)), rej), rej));

const tOf = x => Task((res, rej) => res(x));

const tMap = f => tk =>
  Task((res, rej) => tk.runTask(x => res(f(x)), rej));

const tChain = fm => mx =>
  Task((res, rej) => mx.runTask(x => fm(x).runTask(res, rej), rej));

const log = x => console.log(x);
const elog = e => console.error(e);

const fetchName = (id, cb) => {
  const r = setTimeout(id_ => {
    const m = new Map([[1, "Beau"], [2, "Dev"], [3, "Liz"]]);

    if (m.has(id_))
      return cb(null, m.get(id_));

    else
      return cb("unknown id", null);
  }, 0, id);

  return () => clearTimeout(r);
};

const fetchNameAsync = id =>
  Task((res, rej) =>
    fetchName(id, (err, data) =>
      err === null
        ? res(data)
        : rej(err)));

const a = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(3));

const b = tAp(tMap(x => y => x.length + y.length)
  (fetchNameAsync(1)))
    (fetchNameAsync(5));

a.runTask(log, elog); // 7
b.runTask(log, elog); // Error: "unknown id"

However, I have no idea how to implement awaitAll, which should have the following traits:

  • it either resolves with an array of results of the individual Tasks
  • or it rejects immediately with the first error and cancels all other Tasks
  • it executes Tasks in "parallel"

const awaitAll = ms =>
  Task((res, rej) => ms.map(mx => mx.runTask(...?)));

Any hint is appreciated!

like image 344
Iven Marquardt Avatar asked Mar 16 '19 17:03

Iven Marquardt


2 Answers

Here's another way that takes inspiration from the other answers here as well as the linked folktale/task. Instead of implementing a complicated tAll which takes care of iterating a list of tasks and combining tasks, we'll separate the concerns into individual functions.

Here's a simplified tAnd -

const tAnd = (t1, t2) =>
{ const acc = []

  const guard = (res, i) => x =>
    ( acc[i] = x
    , acc[0] !== undefined && acc[1] !== undefined
        ? res (acc)
        : void 0
    )

  return Task
    ( (res, rej) =>
        ( t1 .runTask (guard (res, 0), rej) // rej could be called twice!
        , t2 .runTask (guard (res, 1), rej) // we'll fix this below
        )
    )
}

It works like this -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]

Now tAll is a breeze to implement -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tAnd (t, tAll (...ts))

Wups, don't forget to flatten along the way -

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll(...ts))
        )

It works like this -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll properly handles errors as well -

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// test passed

Getting tAnd right is surprisingly difficult, even though we've limited the scope of our program, when compared to our original tAll. The combined task should only resolve once, or reject once - not both. This means double resolve/reject should also be avoided. Enforcing these constraints requires a bit more code -

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

Expand the snippet below to verify the result in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// [ 'a', 'b' ]
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

Serial processing

The trickiest bit is in the parallel processing requirement. If the requirements asked for a serial behavior, the implementation is dramatically easier -

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

Implementation for tAll stays the same, of course. Note the difference in delays now as the tasks are now run sequentially -

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

And many tasks with tAll -

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAnd = (t1, t2) =>
  Task
    ( (res, rej) =>
        t1 .runTask
          ( a =>
              t2 .runTask
                ( b =>
                    res ([ a, b ])
                , rej
                )
          , rej
          )
    )

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tAnd
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~2.5 seconds later
// [ 'a', 'b' ]

tAll
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~ 9 seconds later
// [ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ]

tAll
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tOr and tRace

For sake of completeness, here's tOr. Note tOr here is equivalent to folktale's Task.concat -

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

Which resolves or rejects the first-to-complete of two tasks -

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

And tRace -

const tRace = (t = tOf (undefined), ...ts) =>
  ts .reduce (tOr, t)

Which resolves or rejects the first-to-complete of many tasks -

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// 'f'

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (a, b) => k (a, b) })

const tOr = (t1, t2) =>
{ let resolved = false
  let rejected = false

  const guard = (res, rej) =>
    [ x =>
        resolved || rejected
          ? void 0
          : ( resolved = true
            , res (x)
            )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej))
        , t2 .runTask (...guard (res, rej))
        )
    )
}

const tRace = (t = tOf (undefined), ...ts) =>
  ts. reduce (tOr, t)

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

tOr
  ( delay (2000, 'a')
  , delay (500, 'b')
  )
  .runTask (console.log, console.error)

// ~500 ms later
// 'b' 

tRace
  ( delay (2000, 'a')
  , delay (500, 'b')
  , delay (900, 'c')
  , delay (1500, 'd')
  , delay (1800, 'e')
  , delay (300, 'f')
  , delay (2000, 'g')
  )
  .runTask (console.log, console.error)

// ~300 ms later
// note `f` appears in the output first because this tRace demo finishes before the tOr demo above
// 'f'

tRace
  ( delay (100, 'test failed')
  , Task ((_, rej) => rej ('test passed'))
  )
  .runTask (console.log, console.error)

// Error: test passed

How to implement tAp

In the comments, we're talking about applicative, tAp. I think tAll makes the implementation rather easy -

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

tAp accepts a task-wrapped function and any number of task-wrapped values, and returns a new task -

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15

Unless the tasks have a side effect, I cannot see a reason why a "parallel" implementation of tAp breaks the applicative laws.

Expand the snippet below to verify the results in your own browser -

const Task = k =>
  ({ runTask: (res, rej) => k (res, rej) })

const tOf = v =>
  Task ((res, _) => res (v))

const tMap = (f, t) =>
  Task
    ( (res, rej) =>
        t.runTask
          ( x => res (f (x)) 
          , rej
          )
    )

const tAp = (f, ...ts) =>
  tMap
    ( ([ f, ...xs ]) => f (...xs)
    , tAll (f, ...ts)
    )

const tAnd = (t1, t2) =>
{ let resolved = false
  let rejected = false
  
  const result = []

  const pending = ([ a, b ] = result) =>
    a === undefined || b === undefined

  const guard = (res, rej, i) =>
    [ x =>
        ( result[i] = x
        , resolved || rejected || pending ()
            ? void 0
            : ( resolved = true
              , res (result)
              )
        )
    , e =>
        resolved || rejected
          ? void 0
          : ( rejected = true
            , rej (e)
            )
    ]

  return Task
    ( (res, rej) =>
        ( t1 .runTask (...guard (res, rej, 0))
        , t2 .runTask (...guard (res, rej, 1))
        )
    )
}

const tAll = (t, ...ts) =>
  t === undefined
    ? tOf ([])
    : tMap
        ( ([ x, xs ]) => [ x, ...xs ]
        , tAnd (t, tAll (...ts))
        )

const delay = (ms, x) =>
  Task (r => setTimeout (r, ms, x))

const sum = (v, ...vs) =>
  vs.length === 0
    ? v
    : v + sum (...vs)

tAp
  ( delay (2000, sum)
  , delay (500, 1)
  , delay (900, 2)
  , delay (1500, 3)
  , delay (1800, 4)
  , delay (300, 5)
  )
  .runTask (console.log, console.error)

// ~2 seconds later
// 15
like image 106
Mulan Avatar answered Sep 28 '22 00:09

Mulan


Another solution that uses recursion with a 2 Task base case, which then allows to just manage the state in two variables:

  const tAll = ([first, second, ...rest]) =>
   !second
     ? first
     : rest.length 
        ? tMap(
            results => results.flat()
          )(tAll([ tAll([first, second]), tAll(rest) ]))
        : Task((res, rej, a, b, done) => (
            first.runTask(
               value => !done && b ? (res([value, b.value]), done = true) : (a = { value }),
               err => !done && (rej(err), done = true)
            ),
            second.runTask(
               value => !done && a ? (res([a.value, value]), done = true) : (b = { value }),
              err => !done && (rej(err), done = true)
            ) 
         ));
like image 37
Jonas Wilms Avatar answered Sep 28 '22 01:09

Jonas Wilms