Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Thread running in Middleware is using old version of parent's instance variable

I've used Heroku tutorial to implement websockets.

It works properly with Thin, but does not work with Unicorn and Puma.

Also there's an echo message implemented, which responds to client's message. It works properly on each server, so there are no problems with websockets implementation.

Redis setup is also correct (it catches all messages, and executes the code inside subscribe block).

How does it work now:

On server start, an empty @clients array is initialized. Then new Thread is started, which is listening to Redis and which is intended to send that message to corresponding user from @clients array.

On page load, new websocket connection is created, it is stored in @clients array.

If we receive the message from browser, we send it back to all clients connected with the same user (that part is working properly on both Thin and Puma).

If we receive the message from Redis, we also look up for all user's connections stored in @clients array. This is where weird thing happens:

  • If running with Thin, it finds connections in @clients array and sends the message to them.

  • If running with Puma/Unicorn, @clients array is always empty, even if we try it in that order (without page reload or anything):

    1. Send message from browser -> @clients.length is 1, message is delivered
    2. Send message via Redis -> @clients.length is 0, message is lost
    3. Send message from browser -> @clients.length is still 1, message is delivered

Could someone please clarify me what am I missing?

Related config of Puma server:

workers 1
threads_count = 1
threads threads_count, threads_count

Related middleware code:

require 'faye/websocket'

class NotificationsBackend

  def initialize(app)
    @app     = app
    @clients = []
    Thread.new do
      redis_sub = Redis.new
      redis_sub.subscribe(CHANNEL) do |on|
        on.message do |channel, msg|
          # logging @clients.length from here will always return 0
          # [..] retrieve user
          send_message(user.id, { message: "ECHO: #{event.data}"} )
        end
      end
    end
  end

  def call(env)
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, nil, {ping: KEEPALIVE_TIME })
      ws.on :open do |event|
        # [..] retrieve current user
        if user
          # add ws connection to @clients array
        else
          # close ws
        end
      end

      ws.on :message do |event|
        # [..] retrieve current user
        Redis.current.publish({user_id: user.id, { message: "ECHO: #{event.data}"}} )
      end

      ws.rack_response
    else
      @app.call(env)
    end
  end
  def send_message user_id, message
    # logging @clients.length here will always return correct result
    # cs = all connections which belong to that client
    cs.each { |c| c.send(message.to_json) }
  end
end
like image 464
Felix Borzik Avatar asked Mar 15 '23 18:03

Felix Borzik


1 Answers

Unicorn (and apparently puma) both start up a master process and then fork one or more workers. fork copies (or at least presents the illusion of copying - an actual copy usually only happens as you write to pages) your entire process but only the thread that called fork exists in the new process.

Clearly your app is being initialised before being forked - this is normally done so that workers can start quickly and benefit from copy on write memory savings. As a consequence your redis checking thread is only running in the master process whereas @clients is being modified in the child process.

You can probably work around this by either deferring the creation of your redis thread or disabling app preloading, however you should be aware that your setup will prevent you from scaling beyond a single worker process (which with puma and a thread friendly JVM like jruby would be less of a constraint)

like image 124
Frederick Cheung Avatar answered Mar 18 '23 20:03

Frederick Cheung