diff options
author | David Heinemeier Hansson <david@loudthinking.com> | 2016-01-16 15:42:20 +0100 |
---|---|---|
committer | David Heinemeier Hansson <david@loudthinking.com> | 2016-01-16 15:42:20 +0100 |
commit | 01c320001bcce617196270f3d398d48a89a6ea2a (patch) | |
tree | 18ca55a61b9bd0bd73bcad2f8f0dbc61dfc79f69 /actioncable/lib/action_cable/server | |
parent | f5065ef60c011a0d84224e74e1e8a0b882c36223 (diff) | |
download | rails-01c320001bcce617196270f3d398d48a89a6ea2a.tar.gz rails-01c320001bcce617196270f3d398d48a89a6ea2a.tar.bz2 rails-01c320001bcce617196270f3d398d48a89a6ea2a.zip |
Revert "Merge pull request #22977 from rails/revert-22934-master"
This reverts commit d0393fccffc118a5de37654aa222774b66123393, reversing
changes made to 3b7ccadfc1c8dfec61af898167e1300b17f5cf25.
Diffstat (limited to 'actioncable/lib/action_cable/server')
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 3 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 55 |
2 files changed, 42 insertions, 16 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 740e4b301e..cfd0a65f0f 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,6 +1,3 @@ -# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10 -require 'celluloid/current' - require 'em-hiredis' module ActionCable diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index e063b2a2e1..3b6c6d44a1 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,39 +1,68 @@ -require 'celluloid' require 'active_support/callbacks' +require 'active_support/core_ext/module/attribute_accessors_per_thread' +require 'concurrent' module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. class Worker include ActiveSupport::Callbacks - include Celluloid - attr_reader :connection + thread_mattr_accessor :connection define_callbacks :work include ActiveRecordConnectionManagement + def initialize(max_size: 5) + @pool = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: max_size, + max_queue: 0, + ) + end + + def async_invoke(receiver, method, *args) + @pool.post do + invoke(receiver, method, *args) + end + end + def invoke(receiver, method, *args) - @connection = receiver + begin + self.connection = receiver - run_callbacks :work do - receiver.send method, *args + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + ensure + self.connection = nil end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + end - receiver.handle_exception if receiver.respond_to?(:handle_exception) + def async_run_periodic_timer(channel, callback) + @pool.post do + run_periodic_timer(channel, callback) + end end def run_periodic_timer(channel, callback) - @connection = channel.connection + begin + self.connection = channel.connection - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + ensure + self.connection = nil end end private + def logger ActionCable.server.logger end |