Background: We've built a chat feature in to one of our existing Rails applications. We're using the new ActionController::Live
module and running Puma (with Nginx in production), and subscribing to messages through Redis. We're using EventSource
client side to establish the connection asynchronously.
Problem Summary: Threads are never dying when the connection is terminated.
For example, should the user navigate away, close the browser, or even go to a different page within the application, a new thread is spawned (as expected), but the old one continues to live.
The problem as I presently see it is that when any of these situations occur, the server has no way of knowing whether the connection on the browser's end is terminated, until something attempts to write to this broken stream, which would never happen once the browser has moved away from the original page.
This problem seems to be documented on github, and similar questions are asked on StackOverflow here (pretty well exact same question) and here (regarding getting number of active threads).
The only solution I've been able to come up with, based on these posts, is to implement a type of thread / connection poker. Attempting to write to a broken connection generates an IOError
which I can catch and properly close the connection, allowing the thread to die. This is the controller code for that solution:
def events
response.headers["Content-Type"] = "text/event-stream"
stream_error = false; # used by flusher thread to determine when to stop
redis = Redis.new
# Subscribe to our events
redis.subscribe("message.create", "message.user_list_update") do |on|
on.message do |event, data| # when message is received, write to stream
response.stream.write("messageType: '#{event}', data: #{data}\n\n")
end
# This is the monitor / connection poker thread
# Periodically poke the connection by attempting to write to the stream
flusher_thread = Thread.new do
while !stream_error
$redis.publish "message.create", "flusher_test"
sleep 2.seconds
end
end
end
rescue IOError
logger.info "Stream closed"
stream_error = true;
ensure
logger.info "Events action is quitting redis and closing stream!"
redis.quit
response.stream.close
end
(Note: the events
method seems to get blocked on the subscribe
method invocation. Everything else (the streaming) works properly so I assume this is normal.)
(Other note: the flusher thread concept makes more sense as a single long-running background process, a bit like a garbage thread collector. The problem with my implementation above is that a new thread is spawned for each connection, which is pointless. Anyone attempting to implement this concept should do it more like a single process, not so much as I've outlined. I'll update this post when I successfully re-implement this as a single background process.)
The downside of this solution is that we've only delayed or lessened the problem, not completely solved it. We still have 2 threads per user, in addition to other requests such as ajax, which seems terrible from a scaling perspective; it seems completely unattainable and impractical for a larger system with many possible concurrent connections.
I feel like I am missing something vital; I find it somewhat difficult to believe that Rails has a feature that is so obviously broken without implementing a custom connection-checker like I have done.
Question: How do we allow the connections / threads to die without implementing something corny such as a 'connection poker', or garbage thread collector?
As always let me know if I've left anything out.
Update
Just to add a bit of extra info: Huetsch over at github posted this comment pointing out that SSE is based on TCP, which normally sends a FIN packet when the connection is closed, letting the other end (server in this case) know that its safe to close the connection. Huetsch points out that either the browser is not sending that packet (perhaps a bug in the EventSource
library?), or Rails is not catching it or doing anything with it (definitely a bug in Rails, if that's the case). The search continues...
Another Update Using Wireshark, I can indeed see FIN packets being sent. Admittedly, I am not very knowledgeable or experienced with protocol level stuff, however from what I can tell, I definitely detect a FIN packet being sent from the browser when I establish the SSE connection using EventSource from the browser, and NO packet sent if I remove that connection (meaning no SSE). Though I'm not terribly up on my TCP knowledge, this seems to indicate to me that the connection is indeed being properly terminated by the client; perhaps this indicates a bug in Puma or Rails.
Yet another update
@JamesBoutcher / boutcheratwest(github) pointed me to a discussion on the redis website regarding this issue, specifically in regards to the fact that the .(p)subscribe
method never shuts down. The poster on that site pointed out the same thing that we've discovered here, that the Rails environment is never notified when the client-side connection is closed, and therefore is unable to execute the .(p)unsubscribe
method. He inquires about a timeout for the .(p)subscribe
method, which I think would work as well, though I'm not sure which method (the connection poker I've described above, or his timeout suggestion) would be a better solution. Ideally, for the connection poker solution, I'd like to find a way to determine whether the connection is closed on the other end without writing to the stream. As it is right now, as you can see, I have to implement client-side code to handle my "poking" message separately, which I believe is obtrusive and goofy as heck.
A solution I just did (borrowing a lot from @teeg) which seems to work okay (haven't failure tested it, tho)
config/initializers/redis.rb
$redis = Redis.new(:host => "xxxx.com", :port => 6379)
heartbeat_thread = Thread.new do
while true
$redis.publish("heartbeat","thump")
sleep 30.seconds
end
end
at_exit do
# not sure this is needed, but just in case
heartbeat_thread.kill
$redis.quit
end
And then in my controller:
def events
response.headers["Content-Type"] = "text/event-stream"
redis = Redis.new(:host => "xxxxxxx.com", :port => 6379)
logger.info "New stream starting, connecting to redis"
redis.subscribe(['parse.new','heartbeat']) do |on|
on.message do |event, data|
if event == 'parse.new'
response.stream.write("event: parse\ndata: #{data}\n\n")
elsif event == 'heartbeat'
response.stream.write("event: heartbeat\ndata: heartbeat\n\n")
end
end
end
rescue IOError
logger.info "Stream closed"
ensure
logger.info "Stopping stream thread"
redis.quit
response.stream.close
end
I'm currently making an app that revolves around ActionController:Live, EventSource and Puma and for those that are encountering problems closing streams and such, instead of rescuing an IOError
, in Rails 4.2 you need to rescue ClientDisconnected
. Example:
def stream
#Begin is not required
twitter_client = Twitter::Streaming::Client.new(config_params) do |obj|
# Do something
end
rescue ClientDisconnected
# Do something when disconnected
ensure
# Do something else to ensure the stream is closed
end
I found this handy tip from this forum post (all the way at the bottom): http://railscasts.com/episodes/401-actioncontroller-live?view=comments
Here's a potentially simpler solution which does not use a heartbeat. After much research and experimentation, here's the code I'm using with sinatra + sinatra sse gem (which should be easily adapted to Rails 4):
class EventServer < Sinatra::Base
include Sinatra::SSE
set :connections, []
.
.
.
get '/channel/:channel' do
.
.
.
sse_stream do |out|
settings.connections << out
out.callback {
puts 'Client disconnected from sse';
settings.connections.delete(out);
}
redis.subscribe(channel) do |on|
on.subscribe do |channel, subscriptions|
puts "Subscribed to redis ##{channel}\n"
end
on.message do |channel, message|
puts "Message from redis ##{channel}: #{message}\n"
message = JSON.parse(message)
.
.
.
if settings.connections.include?(out)
out.push(message)
else
puts 'closing orphaned redis connection'
redis.unsubscribe
end
end
end
end
end
The redis connection blocks on.message and only accepts (p)subscribe/(p)unsubscribe commands. Once you unsubscribe, the redis connection is no longer blocked and can be released by the web server object which was instantiated by the initial sse request. It automatically clears when you receive a message on redis and sse connection to the browser no longer exists in the collection array.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With