aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
authorJon Moss <me@jonathanmoss.me>2016-01-15 21:07:18 -0500
committerJon Moss <me@jonathanmoss.me>2016-01-18 18:59:31 -0500
commit67af248d51e9f5f56911c80855d4b14642582ea8 (patch)
tree44ce7a887a51ccf6667eed956769d77b6a57afaa /actioncable/lib/action_cable/subscription_adapter
parent4c5d5b75abe85d59e5cc9de9904fdef3b23ec25b (diff)
downloadrails-67af248d51e9f5f56911c80855d4b14642582ea8.tar.gz
rails-67af248d51e9f5f56911c80855d4b14642582ea8.tar.bz2
rails-67af248d51e9f5f56911c80855d4b14642582ea8.zip
Small PostgreSQL adapter refactors / cleanup
- Escape the channel name when subscribing in PG - Refactor popping the queue to make it easier to read
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb30
1 files changed, 16 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index f55b56a2b5..afa99355e8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -8,11 +8,11 @@ end
module ActionCable
module SubscriptionAdapter
- class PostgreSQL < Base
+ class PostgreSQL < Base # :nodoc:
# The storage instance used for broadcasting. Not intended for direct user use.
def broadcast(channel, payload)
with_connection do |pg_conn|
- pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
+ pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
end
end
@@ -21,7 +21,7 @@ module ActionCable
end
def unsubscribe(channel, callback)
- listener.unsubscribe_to(channel, callback)
+ listener.unsubscribe_from(channel, callback)
end
def with_connection(&block) # :nodoc:
@@ -58,18 +58,20 @@ module ActionCable
@adapter.with_connection do |pg_conn|
loop do
until @queue.empty?
- value = @queue.pop(true)
- if value.first == :listen
- pg_conn.exec("LISTEN #{value[1]}")
- ::EM.next_tick(&value[2]) if value[2]
- elsif value.first == :unlisten
- pg_conn.exec("UNLISTEN #{value[1]}")
+ action, channel, callback = @queue.pop(true)
+ escaped_channel = pg_conn.escape_identifier(channel)
+
+ if action == :listen
+ pg_conn.exec("LISTEN #{escaped_channel}")
+ ::EM.next_tick(&callback) if callback
+ elsif action == :unlisten
+ pg_conn.exec("UNLISTEN #{escaped_channel}")
end
+ end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- @subscribers[chan].each do |callback|
- ::EM.next_tick { callback.call(message) }
- end
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ @subscribers[chan].each do |callback|
+ ::EM.next_tick { callback.call(message) }
end
end
end
@@ -86,7 +88,7 @@ module ActionCable
end
end
- def unsubscribe_to(channel, callback)
+ def unsubscribe_from(channel, callback)
@sync.synchronize do
@subscribers[channel].delete(callback)