From 67af248d51e9f5f56911c80855d4b14642582ea8 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 21:07:18 -0500 Subject: Small PostgreSQL adapter refactors / cleanup - Escape the channel name when subscribing in PG - Refactor popping the queue to make it easier to read --- .../lib/action_cable/server/configuration.rb | 1 - .../subscription_adapter/postgresql.rb | 30 ++++++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 344ae0ad5d..cdf5e9eb1c 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -42,4 +42,3 @@ module ActionCable end end end - diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index f55b56a2b5..afa99355e8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -8,11 +8,11 @@ end module ActionCable module SubscriptionAdapter - class PostgreSQL < Base + class PostgreSQL < Base # :nodoc: # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) with_connection do |pg_conn| - pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") end end @@ -21,7 +21,7 @@ module ActionCable end def unsubscribe(channel, callback) - listener.unsubscribe_to(channel, callback) + listener.unsubscribe_from(channel, callback) end def with_connection(&block) # :nodoc: @@ -58,18 +58,20 @@ module ActionCable @adapter.with_connection do |pg_conn| loop do until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") + action, channel, callback = @queue.pop(true) + escaped_channel = pg_conn.escape_identifier(channel) + + if action == :listen + pg_conn.exec("LISTEN #{escaped_channel}") + ::EM.next_tick(&callback) if callback + elsif action == :unlisten + pg_conn.exec("UNLISTEN #{escaped_channel}") end + end - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } end end end @@ -86,7 +88,7 @@ module ActionCable end end - def unsubscribe_to(channel, callback) + def unsubscribe_from(channel, callback) @sync.synchronize do @subscribers[channel].delete(callback) -- cgit v1.2.3