From 185c93eb0c8df629033be48e5eef431190c65226 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 25 Feb 2016 14:31:19 +1030 Subject: Use AS::Executor / AS::Reloader to support reloading in ActionCable --- actioncable/CHANGELOG.md | 5 +++ actioncable/lib/action_cable/connection/base.rb | 5 ++- actioncable/lib/action_cable/engine.rb | 25 +++++++++++ actioncable/lib/action_cable/server/base.rb | 10 +++++ actioncable/lib/action_cable/server/worker.rb | 48 +++++++++++++--------- .../worker/active_record_connection_management.rb | 3 -- actioncable/test/client_test.rb | 20 +++++++++ actioncable/test/stubs/test_server.rb | 4 ++ .../rails/generators/rails/app/templates/config.ru | 4 ++ .../generators/rails/app/templates/config.ru.tt | 10 ----- 10 files changed, 100 insertions(+), 34 deletions(-) create mode 100644 railties/lib/rails/generators/rails/app/templates/config.ru delete mode 100644 railties/lib/rails/generators/rails/app/templates/config.ru.tt diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index a6842d77ef..946fdfb3fc 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,8 @@ +* Safely support autoloading and class unloading, by preventing concurrent + loads, and disconnecting all cables during reload. + + *Matthew Draper* + * Ensure ActionCable behaves correctly for non-string queue names. *Jay Hayes* diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 60f3ad3e06..afe0d958d7 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -48,12 +48,13 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :stream_event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env + @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index ae0c59dccd..c90aadaf2c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -51,5 +51,30 @@ module ActionCable end end end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index c3b64299e3..d9a2653cc2 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -33,6 +33,16 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index b920b880db..49cbaec0c0 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -20,6 +20,26 @@ module ActionCable ) end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @pool.kill + end + + def stopping? + @pool.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -27,19 +47,15 @@ module ActionCable end def invoke(receiver, method, *args) - begin - self.connection = receiver - - run_callbacks :work do + work(receiver) do + begin receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end end end @@ -50,14 +66,8 @@ module ActionCable end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection - - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + work(channel.connection) do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index 1ac8934410..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,8 +12,6 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 1b07689127..a6619d3bd2 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -127,8 +127,16 @@ class ClientTest < ActionCable::TestCase end @ws.close + wait_for_close + end + + def wait_for_close @closed.wait(WAIT_WHEN_EXPECTING_EVENT) end + + def closed? + @closed.set? + end end def faye_client(port) @@ -220,4 +228,16 @@ class ClientTest < ActionCable::TestCase assert_equal(0, app.connections.count) end end + + def test_server_restart + with_puma_server do |port| + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + + ActionCable.server.restart + c.wait_for_close + assert c.closed? + end + end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 56d132b30a..5916cf1e83 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -17,4 +17,8 @@ class TestServer def stream_event_loop @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new end + + def worker_pool + @worker_pool ||= ActionCable::Server::Worker.new(max_size: 5) + end end diff --git a/railties/lib/rails/generators/rails/app/templates/config.ru b/railties/lib/rails/generators/rails/app/templates/config.ru new file mode 100644 index 0000000000..bd83b25412 --- /dev/null +++ b/railties/lib/rails/generators/rails/app/templates/config.ru @@ -0,0 +1,4 @@ +# This file is used by Rack-based servers to start the application. + +require ::File.expand_path('../config/environment', __FILE__) +run Rails.application diff --git a/railties/lib/rails/generators/rails/app/templates/config.ru.tt b/railties/lib/rails/generators/rails/app/templates/config.ru.tt deleted file mode 100644 index 343c0833d7..0000000000 --- a/railties/lib/rails/generators/rails/app/templates/config.ru.tt +++ /dev/null @@ -1,10 +0,0 @@ -# This file is used by Rack-based servers to start the application. - -require ::File.expand_path('../config/environment', __FILE__) -<%- unless options[:skip_action_cable] -%> - -# Action Cable requires that all classes are loaded in advance -Rails.application.eager_load! -<%- end -%> - -run Rails.application -- cgit v1.2.3