Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

socket.io client automatically disconnecting in long Node.js function

I am using socket.io to communicate the swift client of my app with the server. Essentially, the client joins a socket connection upon opening the app and a job is instantly added to a Redis queue (it's a job that takes anywhere from a few seconds to like 15ish seconds). There's a response from the server to the client of the job id. While this job is processing, SOMETIMES the client will disconnect. There doesn't seem to be a rhyme or reason behind this, as the time of disconnection is totally inconsistent and it's also not like the disconnection is happening at a specific point in the function. I thought maybe I was manually disconnecting from the client side so I set up socket emissions right before each disconnect on the client side (when these emissions were emitted to the server, the server prints something that tells me where the disconnect came from). This showed me that the disconnect is automatic, because the emission is never received by the client before ending the socket connection. This is running on Heroku. Here's my code:

//queue initialization
const queue = new Queue('queue', process.env.REDIS_URL)

//client pings this endpoint to get the job id in the queue
app.post('/process', async function(request, response) {
  let job = await queue({request: request.body});
  console.log("Logging job as " + job.id)
  response.json({ id: job.id });
});

queue.process(10, async (job) => { //10 is the max workers per job
    console.log("Started processing")
    const client = await pool.connect()
    let item = job.data.request
    let title = item.title
    let subtitle = item.subtitle
    let id = item.id
    io.to(id).emit("Processing1", ""); //added emissions like these because I thought maybe the socket was timing out, but this didn't help
    console.log("Processing1");

    try {
      await client.query('BEGIN')
        let geoData = await //promise of geocoding endpoint api function
        let lengthOfGeoData = geoData.context.length
        io.to(id).emit("Processing2", "");
        console.log("Processing2");
        var municipality = ""
        var area = ""
        var locality = ""
        var place = ""
        var district = ""
        var region = ""
        var country = ""
        //for loop to go through geoData and set the above values
      if (municipality != "") {
        console.log("Signing in from " + municipality + ", " + area);
      } else {
        console.log("Signing in from " + area)
      }
      await scrape(municipality, area, id);
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log(err)
    }
    try {
      await client.query('BEGIN')
      const array = await //a function that queries a Postgres db for some rows, makes json objects out of them, and pushes to the 'array' variable
      var array2 = []
      for (a of array) {
        let difference = getDifference(title, subtitle, a.title, a.subtitle) //math function
        if (difference <= 10) {
          array.push(a)
        }
      }
      io.to(id).emit("Processing9", "");
      console.log("Processing9");
      await client.query('COMMIT')
    } catch(err) {
      await client.query('ROLLBACK')
      console.log("ERROR: Failed arrayHelperFunction")
      console.log(err)
    } finally {
      client.release()
      console.log("About to emit this ish to " + id) //should emit to socket here ideally to notify that the processing is done and results can be polled
      io.to(id).emit("finishedLoading", "")
      return array2;
    }
});

//when the client polls the queue after it's received the 'done' notifier from the server
app.post('/poll', async function(request, response) {
  console.log("Polling")
  let id = request.body.id
  const results = await queue(id);
  for (r of results.returnvalue) {
    console.log("Sending " + r.title);
  }
  response.send(results.returnvalue)
});

//scrape
async function scrape(municipality, area, id) {
  const client = await pool.connect();
  try {
    await client.query('BEGIN')
    var location = ""
    if (municipality != "") {
      location = municipality + ", " + area
    } else {
      location = area
    }
    let inDatabase = await client.query('SQL statement AS it_does_exist', [params]);
    io.to(id).emit("Processing3", "");
    console.log("Processing3");
    if (inDatabase.rows[0].it_does_exist == false) { 
      let query = "book clubs near " + location
      var terminationTime = new Date()
      terminationTime.setHours(terminationTime.getHours() + 4);
      let date = ("0" + terminationTime.getDate()).slice(-2);
      let month = ("0" + (terminationTime.getMonth() + 1)).slice(-2);
      let year = terminationTime.getFullYear();
      let hours = terminationTime.getHours();
      let minutes = terminationTime.getMinutes();
      let seconds = terminationTime.getSeconds();
      let timestamp = year + "-" + month + "-" + date + " " + hours + ":" + minutes + ":" + seconds

      try {
        await client.query(`SQL statement`, [params]);
      } catch(err) {
        console.log("FAILURE: scrape() at 1.")
        console.log(err)
      }

      var queryLocation = "New York,New York,United States" //default search origination is here
      var queryGLCode = "US"
      io.to(id).emit("Processing4", "");
      console.log("Processing4");
      try {
        await fetch('https://serpapi.com/locations.json?q='+municipality+'&limit=10', { method : "GET" })
          .then(res => res.json())
          .then((json) => {
            for (let index = 0; index < 10; index++) {
              let locationAPIName = json[index].canonical_name
              let locationAPICode = json[index].country_code
              let resultLatitude = json[index].gps[1];
              let resultLongitude = json[index].gps[0];
            }
          });
      } catch(err) {
        console.log("FAILURE: scrape() at 2.")
        console.log(err)
      }
      io.to(id).emit("Processing5", "");
      console.log("Processing5");
      try {
        await Promise.all([
          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode}).then(data => async function(){
            try {
              await client.query('BEGIN');
              let results = data.events_results
              if (results != null) {
                console.log("first HAD results")
                for (result of results) {
                  var fixedAddress = result.address[0]
                  let address = fixedAddress + ", " + result.address[1]
                      
                  let title = result.title + address

                  var description = result.description

                  let geoData = await geocode(address); //mapbox geocode the address
                  let latitude = Number(geoData.center[0]);
                  let longitude = Number(geoData.center[1]);
                  
                    await client.query(`SQL statement`, [params]);
                  
                }
                io.to(id).emit("Processing6", "");
                console.log("Processing6");
              } else {
                console.log("first DID NOT have results")
              }
              console.log("FIRST BLOCK")
              await client.query('COMMIT');
            } catch(err) {
              console.log("Results[0] not found.")
              console.log(err)
              await client.query('ROLLBACK');
            }
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "10"}).then(data => async function(){
            // same as the one above, just with an offset
          }()),

          searchEvents({engine: "google_events", q: query, location: queryLocation, hl: "en", gl: queryGLCode, start: "20"}).then(data => async function(){
            // same as the one above, but with a different offset
          }())
        ])
      } catch(err) {
        console.log("FAILURE: scrape() at 3.")
        console.log(err)
      }

    } else {
      console.log("Location already in the database.")
    }
    await client.query('COMMIT')
  } catch(err) {
    await client.query('ROLLBACK')
    console.log(err)
  } finally {
    client.release()
    return "Resolved";
  }
}

//Client establish socket connection
func establishConnection(_ completion: (() -> Void)? = nil) {
    let socketUrlString: String = appState.server
    self.manager = SocketManager(socketURL: URL(string: socketUrlString)!, config: [.log(false), .reconnects(true), .extraHeaders(["header": "customheader"])])
    self.socket = manager?.defaultSocket
    self.socket?.connect()
    self.socket?.once(clientEvent: .connect, callback: { (data, emitter) in
        if completion != nil{
            completion!()
        }
    })
  //other socket functions
}

//Client initial post request
func process() {
    let server = "serverstring" + "process"
    let title = "title"
    let subtitle = "subtitle"
    let package = BookPackage(title: title, subtitle: subtitle, id: mySocketID) //this is after the initial connection
    print("package is \(package)")
            
    guard let url  = URL(string: server) else { return }

    var urlRequest = URLRequest(url: url)
    
    urlRequest.addValue("application/json", forHTTPHeaderField: "Content-Type")
    urlRequest.addValue("application/json", forHTTPHeaderField: "Accept")
    
    urlRequest.httpMethod = "POST"
    
    guard let data = try? JSONEncoder().encode(package) else { return }
            
    urlRequest.httpBody = data

    let task = URLSession.shared.dataTask(with: urlRequest) {
        (data, response, error) in
        if let error = error {
            print(error)
            return
        }
        guard let data = data else { return }
        guard let dataString = String(data: data, encoding: String.Encoding.utf8) else { return }
        let jsonData = Data(dataString.utf8)
        var decodedJob: Job? = nil
        do {
            decodedJob = try JSONDecoder().decode(Job.self, from: jsonData) //Job is just a struct in the same form as the json object sent back from the server
        } catch {
            print(error.localizedDescription)
        }
        DispatchQueue.main.async {
            self.appState.pendingJob = decodedJob
        }
    }
    // start the task
    task.resume()
}

The only consistent part of this bug is the logs right before the user disconnects (side note: 'reason of disconnect' and 'DISCONNECTED USER' fire on the socket.on('disconnect') event:

https://i.stack.imgur.com/7fjuU.png

https://i.stack.imgur.com/z5bmL.png

https://i.stack.imgur.com/aHNt3.png

https://i.stack.imgur.com/64WYI.png

like image 399
nickcoding2 Avatar asked Nov 06 '22 00:11

nickcoding2


1 Answers

You should be blocking the event loop with await. There is a heartbeat that the client sends every once in a while (which is defined with pingTimeout).

Since no ping is received by the server, it is disconnected.

You should isolate this process. Either find a way to use it with a worker/background process or async, additionally increasing pingTimeout on serverside might help you.

like image 166
siniradam Avatar answered Nov 11 '22 05:11

siniradam