Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

postgres LISTEN/NOTIFY rails

Ryan Bates mentions the LISTEN/NOTIFY functionality of Postgres when discussing push notifications in this episode, but I haven't been able to find any hint on how to implement a LISTEN/NOTIFY in my rails app.

Here is documentation for a wait_for_notify function inside of the pg adaptor, but I can't figure out what exactly that does/is designed for.

Do we need to tap directly into the connection variable of the pg adaptor?

like image 558
Tyler DeWitt Avatar asked May 06 '13 19:05

Tyler DeWitt


1 Answers

You're looking in the right place with the wait_for_notify method, but since ActiveRecord apparently doesn't provide an API for using it, you'll need to get at the underlying PG::Connection object (or one of them, if you're running a multithreaded setup) that ActiveRecord is using to talk to Postgres.

Once you've got the connection, simply execute whatever LISTEN statements you need, then pass a block (and an optional timeout period) to wait_for_notify. Note that this will block the current thread, and monopolize the Postgres connection, until the timeout is reached or a NOTIFY occurs (so you wouldn't want to do this inside a web request, for example). When another process issues a NOTIFY on one of the channels you're listening to, the block will be called with three arguments - the channel that the notified, the pid of the Postgres backend that triggered the NOTIFY, and the payload that accompanied the NOTIFY (if any).

I haven't used ActiveRecord in quite a while, so there may be a cleaner way to do this, but this seems to work alright in 4.0.0.beta1:

# Be sure to check out a connection, so we stay thread-safe.
ActiveRecord::Base.connection_pool.with_connection do |connection|
  # connection is the ActiveRecord::ConnectionAdapters::PostgreSQLAdapter object
  conn = connection.instance_variable_get(:@connection)
  # conn is the underlying PG::Connection object, and exposes #wait_for_notify

  begin
    conn.async_exec "LISTEN channel1"
    conn.async_exec "LISTEN channel2"

    # This will block until a NOTIFY is issued on one of these two channels.
    conn.wait_for_notify do |channel, pid, payload|
      puts "Received a NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end

    # Note that you'll need to call wait_for_notify again if you want to pick
    # up further notifications. This time, bail out if we don't get a
    # notification within half a second.
    conn.wait_for_notify(0.5) do |channel, pid, payload|
      puts "Received a second NOTIFY on channel #{channel}"
      puts "from PG backend #{pid}"
      puts "saying #{payload}"
    end
  ensure
    # Don't want the connection to still be listening once we return
    # it to the pool - could result in weird behavior for the next
    # thread to check it out.
    conn.async_exec "UNLISTEN *"
  end
end

For an example of a more general usage, see Sequel's implementation.

Edit to add: Here's another description of what's going on. This may not be the exact implementation behind the scenes, but it seems to describe the behavior well enough.

Postgres keeps a list of notifications for each connection. When you use a connection to execute LISTEN channel_name, you're telling Postgres that any notifications on that channel should be pushed to this connection's list (multiple connections can listen to the same channel, so a single notification can wind up being pushed to many lists). A connection can LISTEN to many channels at the same time, and notifications to any of them will all be pushed to the same list.

What wait_for_notify does is pop the oldest notification off of the connection's list and passes its information to the block - or, if the list is empty, sleeps until a notification becomes available and does the same for that (or until the timeout is reached, in which case it just returns nil). Since wait_for_notify only handles a single notification, you're going to have to call it repeatedly if you want to handle multiple notifications.

When you UNLISTEN channel_name or UNLISTEN *, Postgres will stop pushing those notifications to your connection's list, but the ones that have already been pushed to that list will stay there, and wait_for_notify will still return them when it is next called. This might cause an issue where notifications that are accumulated after wait_for_notify but before UNLISTEN stick around and are still present when another thread checks out that connection. In that case, after UNLISTEN you might want to call wait_for_notify with short timeouts until it returns nil. But unless you're making heavy use of LISTEN and NOTIFY for many different purposes, though, it's probably not worth worrying about.

I added a better link to Sequel's implementation above, I'd recommend looking at it. It's pretty straightforward.

like image 128
PreciousBodilyFluids Avatar answered Oct 12 '22 12:10

PreciousBodilyFluids