I have a class like so:
import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', this.handle.bind(this));
}
handle(data) {
this.socket.on('data', data => {
});
}
send(data) {
this.socket.write(data);
}
}
How would I turn the send
method into a promise, which returns a value from the socket's data
event? The server only sends data back when data is sent to it, other than a connection message which can easily be suppressed.
I've tried something like:
handle(data) {
this.socket.on('data', data => {
return this.socket.resolve(data);
});
this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
return new Promise((resolve, reject) => {
this.socket.resolve = resolve;
this.socket.reject = reject;
this.socket.write(data);
});
}
Obviously this won't work because resolve
/reject
will overwrite each other when chaining and/or calling send
multiple times in parallel.
There's also the problem of calling send
twice in parallel and it resolving whichever response comes back first.
I currently have an implementation using a queue and defers , but it feels messy since the queue is constantly being checked.
I'd like to be able to do the following:
let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
return c.send('bar', response.param);
//`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
console.log(response);
}).catch(error => console.log(error));
Just to add, I don't have any control over the data that is received, meaning it can't be modified outside of the stream.
Edit: So it seems this is pretty impossible, due to TCP not having a request-response flow. How can this be implemented still using promises, but either using a single-execution (one request at a time) promise chain or a queue.
You are passing a callback that defines the specific behavior of your promise. A Promise is a container that gives us an API to manage and transform a value, and its specificity is that it lets us manage and transform values that are actually not already there yet.
Promises are used to handle asynchronous operations in JavaScript. They are easy to manage when dealing with multiple asynchronous operations where callbacks can create callback hell leading to unmanageable code.
Consuming a promise For example, when we request data from a server via an API that returns a promise, we utilize the then() and catch() methods to consume whatever data is delivered. In the above code, the then() method is executed when the promise is fulfilled by the resolve() callback.
A promise is an object which can be returned synchronously from an asynchronous function. It will be in one of 3 possible states: Fulfilled: onFulfilled() will be called (e.g., resolve() was called) Rejected: onRejected() will be called (e.g., reject() was called)
I distilled the problem to the bare minimum and made it browser runnable:
EventEmitter
.The solution works by appending new requests to the promise chain, but allowing maximum one open / not-answered request at any given timepoint. .send
returns a new promise each time it is called and the class takes care of all internal synchronisation. So, .send
may be called multiple times and the correct ordered (FIFO) of requests processing is guaranteed. One additional feature that I added is trimming the promise chain, if there are no pending requests.
Caveat I omitted error handling altogether, butit should be tailored to your particular use case anyway.
DEMO
class SocketMock {
constructor(){
this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) );
this.listeners = {
// 'error' : [],
'data' : []
}
}
send(data){
console.log(`SENDING DATA: ${data}`);
var response = `SERVER RESPONSE TO: ${data}`;
setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),
Math.random()*2000 + 250);
}
on(event, callback){
this.listeners[event].push(callback);
}
}
class SingleRequestCoordinator {
constructor() {
this._openRequests = 0;
this.socket = new SocketMock();
this._promiseChain = this.socket
.connected.then( () => console.log('SOCKET CONNECTED'));
this.socket.on('data', (data) => {
this._openRequests -= 1;
console.log(this._openRequests);
if(this._openRequests === 0){
console.log('NO PENDING REQUEST --- trimming the chain');
this._promiseChain = this.socket.connected
}
this._deferred.resolve(data);
});
}
send(data) {
this._openRequests += 1;
this._promiseChain = this._promiseChain
.then(() => {
this._deferred = Promise.defer();
this.socket.send(data);
return this._deferred.promise;
});
return this._promiseChain;
}
}
var sender = new SingleRequestCoordinator();
sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send('data-4')
.then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With