aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-01-24 19:00:06 +1030
committerMatthew Draper <matthew@trebex.net>2016-01-24 19:00:06 +1030
commit7363ad43f598d6833d3c773f56e58a9f5d6e2e1f (patch)
treeba56f403a2ca56b01917e5212e9a99076fd66294 /actioncable/lib/action_cable/subscription_adapter
parentb17a7e4c4daa4ab54223b0c5e2d03b491c070e0e (diff)
downloadrails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.tar.gz
rails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.tar.bz2
rails-7363ad43f598d6833d3c773f56e58a9f5d6e2e1f.zip
Allow subscription adapters to be shut down
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb41
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb5
4 files changed, 40 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 19747601be..4a2a8d23a2 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -13,6 +13,10 @@ module ActionCable
subscriber_map.remove_subscriber(channel, callback)
end
+ def shutdown
+ # nothing to do
+ end
+
private
def subscriber_map
@subscriber_map ||= SubscriberMap.new
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 37247634db..78f8aeb599 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -19,6 +19,10 @@ module ActionCable
listener.remove_subscriber(channel, callback)
end
+ def shutdown
+ listener.shutdown
+ end
+
def with_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
@@ -43,7 +47,7 @@ module ActionCable
@adapter = adapter
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,26 +55,35 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ ::EM.next_tick(&callback) if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- broadcast(chan, message)
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
end
end
end
end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 90abf21367..3b86354621 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -20,6 +20,11 @@ module ActionCable
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
private
def redis_connection_for_subscriptions
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|