aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb30
1 files changed, 16 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index f55b56a2b5..afa99355e8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -8,11 +8,11 @@ end
module ActionCable
module SubscriptionAdapter
- class PostgreSQL < Base
+ class PostgreSQL < Base # :nodoc:
# The storage instance used for broadcasting. Not intended for direct user use.
def broadcast(channel, payload)
with_connection do |pg_conn|
- pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
end
end
@@ -21,7 +21,7 @@ module ActionCable
end
def unsubscribe(channel, callback)
- listener.unsubscribe_to(channel, callback)
+ listener.unsubscribe_from(channel, callback)
end
def with_connection(&block) # :nodoc:
@@ -58,18 +58,20 @@ module ActionCable
@adapter.with_connection do |pg_conn|
loop do
until @queue.empty?
- value = @queue.pop(true)
- if value.first == :listen
- pg_conn.exec("LISTEN #{value[1]}")
- ::EM.next_tick(&value[2]) if value[2]
- elsif value.first == :unlisten
- pg_conn.exec("UNLISTEN #{value[1]}")
+ action, channel, callback = @queue.pop(true)
+ escaped_channel = pg_conn.escape_identifier(channel)
+
+ if action == :listen
+ pg_conn.exec("LISTEN #{escaped_channel}")
+ ::EM.next_tick(&callback) if callback
+ elsif action == :unlisten
+ pg_conn.exec("UNLISTEN #{escaped_channel}")
end
+ end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
- end
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
end
end
end
@@ -86,7 +88,7 @@ module ActionCable
end
end
- def unsubscribe_to(channel, callback)
+ def unsubscribe_from(channel, callback)
@sync.synchronize do
@subscribers[channel].delete(callback)