aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-03-02 02:56:53 +1030
committerMatthew Draper <matthew@trebex.net>2016-03-02 02:56:53 +1030
commit541e4abb4b3710a384aefac83cafd0ab878c60bf (patch)
tree55603da9b6f9407a24c68e1d1fd53e109c4131e7 /actioncable/lib/action_cable
parentecdc0fbc872ea734adf29b3f9f3463b916235286 (diff)
parent60b53e9883bfd9f4edb640dbe2de89227b875e09 (diff)
downloadrails-541e4abb4b3710a384aefac83cafd0ab878c60bf.tar.gz
rails-541e4abb4b3710a384aefac83cafd0ab878c60bf.tar.bz2
rails-541e4abb4b3710a384aefac83cafd0ab878c60bf.zip
Merge pull request #23807 from matthewd/executor
Publish AS::Executor and AS::Reloader APIs
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb5
-rw-r--r--actioncable/lib/action_cable/engine.rb25
-rw-r--r--actioncable/lib/action_cable/server/base.rb10
-rw-r--r--actioncable/lib/action_cable/server/worker.rb48
-rw-r--r--actioncable/lib/action_cable/server/worker/active_record_connection_management.rb3
5 files changed, 67 insertions, 24 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 60f3ad3e06..afe0d958d7 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -48,12 +48,13 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool
+ delegate :stream_event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
+ @worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index ae0c59dccd..c90aadaf2c 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -51,5 +51,30 @@ module ActionCable
end
end
end
+
+ initializer "action_cable.set_work_hooks" do |app|
+ ActiveSupport.on_load(:action_cable) do
+ ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
+ app.executor.wrap do
+ # If we took a while to get the lock, we may have been halted
+ # in the meantime. As we haven't started doing any real work
+ # yet, we should pretend that we never made it off the queue.
+ unless stopping?
+ inner.call
+ end
+ end
+ end
+
+ wrap = lambda do |_, inner|
+ app.executor.wrap(&inner)
+ end
+ ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
+ ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
+
+ app.reloader.before_class_unload do
+ ActionCable.server.restart
+ end
+ end
+ end
end
end
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index c3b64299e3..d9a2653cc2 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -33,6 +33,16 @@ module ActionCable
remote_connections.where(identifiers).disconnect
end
+ def restart
+ connections.each(&:close)
+
+ @mutex.synchronize do
+ worker_pool.halt if @worker_pool
+
+ @worker_pool = nil
+ end
+ end
+
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index b920b880db..49cbaec0c0 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -20,6 +20,26 @@ module ActionCable
)
end
+ # Stop processing work: any work that has not already started
+ # running will be discarded from the queue
+ def halt
+ @pool.kill
+ end
+
+ def stopping?
+ @pool.shuttingdown?
+ end
+
+ def work(connection)
+ self.connection = connection
+
+ run_callbacks :work do
+ yield
+ end
+ ensure
+ self.connection = nil
+ end
+
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
@@ -27,19 +47,15 @@ module ActionCable
end
def invoke(receiver, method, *args)
- begin
- self.connection = receiver
-
- run_callbacks :work do
+ work(receiver) do
+ begin
receiver.send method, *args
- end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
- ensure
- self.connection = nil
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ end
end
end
@@ -50,14 +66,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
- begin
- self.connection = channel.connection
-
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- ensure
- self.connection = nil
+ work(channel.connection) do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
index 1ac8934410..c1e4aa8103 100644
--- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -1,7 +1,6 @@
module ActionCable
module Server
class Worker
- # Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
@@ -13,8 +12,6 @@ module ActionCable
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
- ensure
- ActiveRecord::Base.clear_active_connections!
end
end
end