Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How should I handle this use case using EventMachine?

I have an application that reacts to messages sent by clients. One message is reload_credentials, that the application receives any time a new client registers. This message will then connect to a PostgreSQL database, do a query for all the credentials, and then store them in a regular Ruby hash ( client_id => client_token ).

Some other messages that the application may receive are start,stop,pause which are used to keep track of some session times. My point is that I envision the application functioning in the following way:

  • client sends a message
  • message gets queued
  • queue is being processed

However, for example, I don't want to block the reactor. Furthermore, let's imagine I have a reload_credentials message that's next in queue. I don't want any other message from the queue to be processed until the credentials are reloaded from the DB. Also, while I am processing a certain message ( like waiting for the credentials query to finish) , I want to allow other messages to be enqueued .

Could you please guide me towards solving such a problem? I'm thinking I may have to use em-synchrony, but I am not sure.

like image 504
Geo Avatar asked Sep 06 '12 19:09

Geo


1 Answers

Use one of the Postgresql EM drivers, or EM.defer so that you won't block the reactor.

When you receive the 'reload_credentials' message just flip a flag that causes all subsequent messages to be enqueued. Once the 'reload_credentials' has finished, process all messages from the queue. After the queue is empty flip the flag that causes messages to be processed as they are received.

EM drivers for Postgresql are listed here: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server
  def post_init
    @queue               = []
    @loading_credentials = false
  end

  def recieve_message(type, data)
    return @queue << [type, data] if @loading_credentials || [email protected]?
    return process_msg(type, data) unless :reload_credentials == type
    @loading_credentials = true
    reload_credentials do
      @loading_credentials = false
      process_queue
    end
  end

  def reload_credentials(&when_done)
    EM.defer( proc { query_and_load_credentials }, when_done )
  end


  def process_queue
    while (type, data = @queue.shift)
      process_msg(type, data)
    end
  end

  # lots of other methods
end

EM.start_server(HOST, PORT, Server)

If you want all connections to queue messages whenever any connection receives a 'reload_connections' message you'll have to coordinate via the eigenclass.

like image 129
simulacre Avatar answered Nov 04 '22 15:11

simulacre