Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Asking for examples of async generators not directly transformable into manually implemented async iteration

Async generators use an internal queue to handle synchronous next, thrown, and return methods calls.

I was trying to construct a situation where this queue is mandatory for the success of the iteration itself. Therefore, I'm looking for some cases where a manual implementation of the async iteration interfaces, without a custom reimplementation of the queue, is not enough.

The following is an example but not so good, because the general time consistency is not maintained but the iteration result is correct at each step:

function aItsFactory() {
    let i = 1;
    return {
        async next() {
            if(i > 5) return Promise.resolve({ value: void 0, done: true });
            const res = await fetch(`https://jsonplaceholder.typicode.com/posts/${i++}`).then(x => x.json());
            return Promise.resolve({ value: res, done: false });
        },
        [Symbol.asyncIterator]() { 
            return this;
        }
    }
}

const ait = aItsFactory();


// general time consistency is lost, because e.g. the fourth call
// is started with the previous three and it could end before the others.

// But the 'i' state is correctly shared so the fifth call
// is correctly requesting the element number five to the source
// and the last call will correctly receive { done: true }

;(async () => {
      ait.next();
      ait.next();
      ait.next();
      ait.next();
      console.log(await ait.next()); // { done: false, value: { userId: 1, id: 5, title: ... } }

      console.log(await ait.next()); // { done: true, value: undefined }
})();

It could be argued that without a proper queue the iteration concept itself will be lost. That is because of the active parallel next calls.

Anyway, I'd like to find some examples, also trivial ones, which make clear that async generators are a better approach for creating well-formed async iterables than a manual implementation of the async iteration interfaces.

------ Edit ------

Let's talk about an improved situation:

function aItsFactory() {
    let i = 1;
    let done = false;

    return {
        async next() {

            if (done) return Promise.resolve({
                done: true,
                value: undefined
            });

            const res = await fetch(`https://jsonplaceholder.typicode.com/posts/${i++}`).then(x => x.json());

            if (Object.keys(res).length === 0) { // the jsonplaceholder source is out of bounds
                done = true;
                return Promise.resolve({
                    done: true,
                    value: undefined
                });
            } else {
                return Promise.resolve({
                    done: false,
                    value: res
                });
            };

        },
        [Symbol.asyncIterator]() {
            return this;
        }
    }
}

const ait = aItsFactory();

// now lot of sync call to 'ait.next'

Here the done resolution is fully asynchronous. From an async iteration perspective, the code is wrong because each next call should be forced to await the outcome of the previous to know if it was the last valid iteration. In such a case, the current next should do nothing, immediately returning Promise.resolve({done:true, value:undefined}). This is only possible thanks to a queue of sync next calls.

But in practice the major risk of going out-of-bounds, calling ait.next() repeatedly, are some useless AJAX request. Don't misunderstand me, I'm not saying that we can turn a blind eye. The point is that each step of the async iteration itself will never be broken.

I'd like to see a situation, not too unrealistic, where the iteration itself could be compromised at each step if all the next calls are not enqueued.

like image 646
Andrea Simone Costa Avatar asked Aug 20 '19 17:08

Andrea Simone Costa


1 Answers

The following scenario:

You have a stream of datasets coming in, e.g. from some API. You want to do some heavy calculations on each dataset, thats why you send the dataset to another worker. But sometimes the API might send multiple datasets at once, and you don't want to have a lot of workers running at the same time, instead you want to have a limited number of workers. In that dataset you are searching for a specific result. With async iterators you could write it as:

 const incoming = createSomeAsyncIterator();

  async function processData() {
    let done, value;
    while(!done) {
      ({ done, value } = await incoming.next());
      if(!done) {
        const result = await searchInWorker(value);
        if(result) {
           incoming.return();
           return result;
        }
      }
    }
 }

 // Consume tasks in two workers.
 Promise.race([
   processData(), processData()
 ]).then(gold => /*...*/);

The code above will fail if .next() wouldn't return datasets in order. Then one of the workers might still go on although the search is done already. Or two workers might work on the same dataset.


Or the rate liming example (stolen from Bergi :)):

 async function* rateLimit(limit, time) {
   let count = 0;
   while(true) {
     if(count++ >= limit) {
       await delay(time);
        count = 0;
      }
      yield; // run api call
   }
 }

const userAPIRate = rateLimit(10, 1000);
async function getUser(id) {
  await userAPIRate.next();
  return doCall("/user/", id);
}

Or imagine you want to show a stream of pictures in some form of gallery (in React):

 const images = streamOfImages();

const Image = () => {
  const [image, setImage] = useState(null);
  useEffect((async ( ) => {
     if(image) await delay(10000); // show image at least 10secs
    const { value } = await images.next();
    setImage(value);
  }, [image]);

    return <img src={image || "loading.png"} />;
 };

const Gallery = () => <div>
  <Image /> <Image /> <Image />
 </div>;

And another one, sheduling data onto a worker, so that one process runs at a time:

  const worker = (async function* () {
    let task;
    while(true) task = yield task && await doInWorker(task);
  })();

 worker.next();

 worker.next("task 1").then(taskOne => ...);
 worker.next("task 2").then(taskTwo => ...);
like image 173
Jonas Wilms Avatar answered Oct 06 '22 13:10

Jonas Wilms