aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb32
1 files changed, 24 insertions, 8 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index e384ea4afb..50ec438c3a 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -14,7 +14,7 @@ module ActionCable
end
def broadcast(channel, payload)
- with_connection do |pg_conn|
+ with_broadcast_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{pg_conn.escape_string(payload)}'")
end
end
@@ -31,14 +31,24 @@ module ActionCable
listener.shutdown
end
- def with_connection(&block) # :nodoc:
- ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
- pg_conn = ar_conn.raw_connection
+ def with_subscriptions_connection(&block) # :nodoc:
+ ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
+ # Action Cable is taking ownership over this database connection, and
+ # will perform the necessary cleanup tasks
+ ActiveRecord::Base.connection_pool.remove(conn)
+ end
+ pg_conn = ar_conn.raw_connection
- unless pg_conn.is_a?(PG::Connection)
- raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
- end
+ verify!(pg_conn)
+ yield pg_conn
+ ensure
+ ar_conn.disconnect!
+ end
+ def with_broadcast_connection(&block) # :nodoc:
+ ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
+ pg_conn = ar_conn.raw_connection
+ verify!(pg_conn)
yield pg_conn
end
end
@@ -52,6 +62,12 @@ module ActionCable
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
+ def verify!(pg_conn)
+ unless pg_conn.is_a?(PG::Connection)
+ raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
+ end
+ end
+
class Listener < SubscriberMap
def initialize(adapter, event_loop)
super()
@@ -67,7 +83,7 @@ module ActionCable
end
def listen
- @adapter.with_connection do |pg_conn|
+ @adapter.with_subscriptions_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?