diff options
Diffstat (limited to 'actioncable/lib/action_cable/server/worker.rb')
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 71 |
1 files changed, 71 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..a638ff72e7 --- /dev/null +++ b/actioncable/lib/action_cable/server/worker.rb @@ -0,0 +1,71 @@ +require 'active_support/callbacks' +require 'active_support/core_ext/module/attribute_accessors_per_thread' +require 'concurrent' + +module ActionCable + module Server + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: + include ActiveSupport::Callbacks + + thread_mattr_accessor :connection + define_callbacks :work + include ActiveRecordConnectionManagement + + attr_reader :executor + + def initialize(max_size: 5) + @executor = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: max_size, + max_queue: 0, + ) + end + + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @executor.kill + end + + def stopping? + @executor.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) + end + end + + def invoke(receiver, method, *args, connection:) + work(connection) do + begin + receiver.send method, *args + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + end + end + + private + + def logger + ActionCable.server.logger + end + end + end +end |