diff options
author | Matthew Draper <matthew@trebex.net> | 2016-03-02 02:56:53 +1030 |
---|---|---|
committer | Matthew Draper <matthew@trebex.net> | 2016-03-02 02:56:53 +1030 |
commit | 541e4abb4b3710a384aefac83cafd0ab878c60bf (patch) | |
tree | 55603da9b6f9407a24c68e1d1fd53e109c4131e7 /actioncable/lib/action_cable | |
parent | ecdc0fbc872ea734adf29b3f9f3463b916235286 (diff) | |
parent | 60b53e9883bfd9f4edb640dbe2de89227b875e09 (diff) | |
download | rails-541e4abb4b3710a384aefac83cafd0ab878c60bf.tar.gz rails-541e4abb4b3710a384aefac83cafd0ab878c60bf.tar.bz2 rails-541e4abb4b3710a384aefac83cafd0ab878c60bf.zip |
Merge pull request #23807 from matthewd/executor
Publish AS::Executor and AS::Reloader APIs
Diffstat (limited to 'actioncable/lib/action_cable')
5 files changed, 67 insertions, 24 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 60f3ad3e06..afe0d958d7 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -48,12 +48,13 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :stream_event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env + @worker_pool = server.worker_pool @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index ae0c59dccd..c90aadaf2c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -51,5 +51,30 @@ module ActionCable end end end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index c3b64299e3..d9a2653cc2 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -33,6 +33,16 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index b920b880db..49cbaec0c0 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -20,6 +20,26 @@ module ActionCable ) end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @pool.kill + end + + def stopping? + @pool.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -27,19 +47,15 @@ module ActionCable end def invoke(receiver, method, *args) - begin - self.connection = receiver - - run_callbacks :work do + work(receiver) do + begin receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end end end @@ -50,14 +66,8 @@ module ActionCable end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection - - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + work(channel.connection) do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index 1ac8934410..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,8 +12,6 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end |