Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How can I control program flow using events and promises?

I have a class like so:

import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';

class MyClass extends EventEmitter {
    constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.connect();
    }
    connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', this.handle.bind(this));
    }
    handle(data) {
        this.socket.on('data', data => {

        });
    }
    send(data) {
        this.socket.write(data);
    }
}

How would I turn the send method into a promise, which returns a value from the socket's data event? The server only sends data back when data is sent to it, other than a connection message which can easily be suppressed.

I've tried something like:

handle(data) {
    this.socket.on('data', data => {
        return this.socket.resolve(data);
    });
    this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
    return new Promise((resolve, reject) => {
        this.socket.resolve = resolve;
        this.socket.reject = reject;
        this.socket.write(data);
    });
}

Obviously this won't work because resolve/reject will overwrite each other when chaining and/or calling send multiple times in parallel.

There's also the problem of calling send twice in parallel and it resolving whichever response comes back first.

I currently have an implementation using a queue and defers , but it feels messy since the queue is constantly being checked.

I'd like to be able to do the following:

let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
    return c.send('bar', response.param);
    //`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
    console.log(response);
}).catch(error => console.log(error));

Just to add, I don't have any control over the data that is received, meaning it can't be modified outside of the stream.

Edit: So it seems this is pretty impossible, due to TCP not having a request-response flow. How can this be implemented still using promises, but either using a single-execution (one request at a time) promise chain or a queue.

like image 639
Ben Fortune Avatar asked Jul 10 '15 13:07

Ben Fortune


People also ask

How do promises work under the hood?

You are passing a callback that defines the specific behavior of your promise. A Promise is a container that gives us an API to manage and transform a value, and its specificity is that it lets us manage and transform values that are actually not already there yet.

What is the use of promises in JavaScript?

Promises are used to handle asynchronous operations in JavaScript. They are easy to manage when dealing with multiple asynchronous operations where callbacks can create callback hell leading to unmanageable code.

How do I use promises in node JS?

Consuming a promise For example, when we request data from a server via an API that returns a promise, we utilize the then() and catch() methods to consume whatever data is delivered. In the above code, the then() method is executed when the promise is fulfilled by the resolve() callback.

What is a JavaScript Promise and what are the names of the two main parts of this type of object and how do they work together to replace an asynchronous callback?

A promise is an object which can be returned synchronously from an asynchronous function. It will be in one of 3 possible states: Fulfilled: onFulfilled() will be called (e.g., resolve() was called) Rejected: onRejected() will be called (e.g., reject() was called)


1 Answers

I distilled the problem to the bare minimum and made it browser runnable:

  1. Socket class is mocked.
  2. Removed info about port, host and inheritance from EventEmitter.

The solution works by appending new requests to the promise chain, but allowing maximum one open / not-answered request at any given timepoint. .send returns a new promise each time it is called and the class takes care of all internal synchronisation. So, .send may be called multiple times and the correct ordered (FIFO) of requests processing is guaranteed. One additional feature that I added is trimming the promise chain, if there are no pending requests.


Caveat I omitted error handling altogether, butit should be tailored to your particular use case anyway.


DEMO

class SocketMock {

  constructor(){
    this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); 
    this.listeners = {
  //  'error' : [],
    'data' : []
    }
  }

  send(data){

    console.log(`SENDING DATA: ${data}`);
    var response = `SERVER RESPONSE TO: ${data}`;
    setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),               
               Math.random()*2000 + 250); 
  }

  on(event, callback){
    this.listeners[event].push(callback); 
  }

}

class SingleRequestCoordinator {

    constructor() {
        this._openRequests = 0; 
        this.socket = new SocketMock();
        this._promiseChain = this.socket
            .connected.then( () => console.log('SOCKET CONNECTED'));
      this.socket.on('data', (data) => {
        this._openRequests -= 1;
        console.log(this._openRequests);
        if(this._openRequests === 0){
          console.log('NO PENDING REQUEST --- trimming the chain');
          this._promiseChain = this.socket.connected
        }
        this._deferred.resolve(data);
      });

    }

    send(data) {
      this._openRequests += 1;
      this._promiseChain = this._promiseChain
        .then(() => {
            this._deferred = Promise.defer();
            this.socket.send(data);
            return this._deferred.promise;
        });
      return this._promiseChain;
    }
}

var sender = new SingleRequestCoordinator();

sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));

setTimeout(() => sender.send('data-4')
    .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
like image 84
artur grzesiak Avatar answered Sep 20 '22 12:09

artur grzesiak