aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-24 19:00:06 +1030
committerMatthew Draper <matthew@trebex.net>2016-01-24 19:00:06 +1030
commit7363ad43f598d6833d3c773f56e58a9f5d6e2e1f (patch)
treeba56f403a2ca56b01917e5212e9a99076fd66294 /actioncable/lib/action_cable/subscription_adapter/postgresql.rb
parentb17a7e4c4daa4ab54223b0c5e2d03b491c070e0e (diff)
downloadrails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.tar.gz
rails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.tar.bz2
rails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.zip
Allow subscription adapters to be shut down
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/postgresql.rb')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb41
1 files changed, 27 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 37247634db..78f8aeb599 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -19,6 +19,10 @@ module ActionCable
listener.remove_subscriber(channel, callback)
end
+ def shutdown
+ listener.shutdown
+ end
+
def with_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
@@ -43,7 +47,7 @@ module ActionCable
@adapter = adapter
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,26 +55,35 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ ::EM.next_tick(&callback) if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- broadcast(chan, message)
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
end
end
end
end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end