I have an application that reacts to messages sent by clients. One message is reload_credentials
, that the application receives any time a new client registers. This message will then connect to a PostgreSQL database, do a query for all the credentials, and then store them in a regular Ruby hash ( client_id => client_token ).
Some other messages that the application may receive are start
,stop
,pause
which are used to keep track of some session times. My point is that I envision the application functioning in the following way:
However, for example, I don't want to block the reactor. Furthermore, let's imagine I have a reload_credentials
message that's next in queue. I don't want any other message from the queue to be processed until the credentials are reloaded from the DB. Also, while I am processing a certain message ( like waiting for the credentials query to finish) , I want to allow other messages to be enqueued .
Could you please guide me towards solving such a problem? I'm thinking I may have to use em-synchrony
, but I am not sure.
Use one of the Postgresql EM drivers, or EM.defer so that you won't block the reactor.
When you receive the 'reload_credentials' message just flip a flag that causes all subsequent messages to be enqueued. Once the 'reload_credentials' has finished, process all messages from the queue. After the queue is empty flip the flag that causes messages to be processed as they are received.
EM drivers for Postgresql are listed here: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations
module Server
def post_init
@queue = []
@loading_credentials = false
end
def recieve_message(type, data)
return @queue << [type, data] if @loading_credentials || [email protected]?
return process_msg(type, data) unless :reload_credentials == type
@loading_credentials = true
reload_credentials do
@loading_credentials = false
process_queue
end
end
def reload_credentials(&when_done)
EM.defer( proc { query_and_load_credentials }, when_done )
end
def process_queue
while (type, data = @queue.shift)
process_msg(type, data)
end
end
# lots of other methods
end
EM.start_server(HOST, PORT, Server)
If you want all connections to queue messages whenever any connection receives a 'reload_connections' message you'll have to coordinate via the eigenclass.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With