aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/base.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb41
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb5
4 files changed, 40 insertions, 14 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb
index 11910803e8..796db5ffa3 100644
--- a/actioncable/lib/action_cable/subscription_adapter/base.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/base.rb
@@ -19,6 +19,10 @@ module ActionCable
def unsubscribe(channel, message_callback)
raise NotImplementedError
end
+
+ def shutdown
+ raise NotImplementedError
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 19747601be..4a2a8d23a2 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -13,6 +13,10 @@ module ActionCable
subscriber_map.remove_subscriber(channel, callback)
end
+ def shutdown
+ # nothing to do
+ end
+
private
def subscriber_map
@subscriber_map ||= SubscriberMap.new
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 37247634db..78f8aeb599 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -19,6 +19,10 @@ module ActionCable
listener.remove_subscriber(channel, callback)
end
+ def shutdown
+ listener.shutdown
+ end
+
def with_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
@@ -43,7 +47,7 @@ module ActionCable
@adapter = adapter
@queue = Queue.new
- Thread.new do
+ @thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
@@ -51,26 +55,35 @@ module ActionCable
def listen
@adapter.with_connection do |pg_conn|
- loop do
- until @queue.empty?
- action, channel, callback = @queue.pop(true)
- escaped_channel = pg_conn.escape_identifier(channel)
-
- if action == :listen
- pg_conn.exec("LISTEN #{escaped_channel}")
- ::EM.next_tick(&callback) if callback
- elsif action == :unlisten
- pg_conn.exec("UNLISTEN #{escaped_channel}")
+ catch :shutdown do
+ loop do
+ until @queue.empty?
+ action, channel, callback = @queue.pop(true)
+
+ case action
+ when :listen
+ pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
+ ::EM.next_tick(&callback) if callback
+ when :unlisten
+ pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
+ when :shutdown
+ throw :shutdown
+ end
end
- end
- pg_conn.wait_for_notify(1) do |chan, pid, message|
- broadcast(chan, message)
+ pg_conn.wait_for_notify(1) do |chan, pid, message|
+ broadcast(chan, message)
+ end
end
end
end
end
+ def shutdown
+ @queue.push([:shutdown])
+ Thread.pass while @thread.alive?
+ end
+
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 90abf21367..3b86354621 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -20,6 +20,11 @@ module ActionCable
redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
end
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
private
def redis_connection_for_subscriptions
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|