diff options
Diffstat (limited to 'actioncable/lib')
-rw-r--r-- | actioncable/lib/action_cable/channel/periodic_timers.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/connection/base.rb | 2 | ||||
-rw-r--r-- | actioncable/lib/action_cable/process/logging.rb | 3 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/base.rb | 3 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 55 |
5 files changed, 21 insertions, 44 deletions
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..25fe8e5e54 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -28,7 +28,7 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) + connection.worker_pool.async.run_periodic_timer(self, callback) end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index a8cfdf90f3..977856d656 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -103,7 +103,7 @@ module ActionCable # Invoke a method on the connection asynchronously through the pool of thread workers. def send_async(method, *arguments) - worker_pool.async_invoke(self, method, *arguments) + worker_pool.async.invoke(self, method, *arguments) end # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb index dce637b3ca..72b1a080d1 100644 --- a/actioncable/lib/action_cable/process/logging.rb +++ b/actioncable/lib/action_cable/process/logging.rb @@ -1,7 +1,10 @@ require 'action_cable/server' require 'eventmachine' +require 'celluloid' EM.error_handler do |e| puts "Error raised inside the event loop: #{e.message}" puts e.backtrace.join("\n") end + +Celluloid.logger = ActionCable.server.logger diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index cfd0a65f0f..740e4b301e 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,3 +1,6 @@ +# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10 +require 'celluloid/current' + require 'em-hiredis' module ActionCable diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..e063b2a2e1 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,68 +1,39 @@ +require 'celluloid' require 'active_support/callbacks' -require 'active_support/core_ext/module/attribute_accessors_per_thread' -require 'concurrent' module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. class Worker include ActiveSupport::Callbacks + include Celluloid - thread_mattr_accessor :connection + attr_reader :connection define_callbacks :work include ActiveRecordConnectionManagement - def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( - min_threads: 1, - max_threads: max_size, - max_queue: 0, - ) - end - - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) - end - end - def invoke(receiver, method, *args) - begin - self.connection = receiver + @connection = receiver - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") - - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + run_callbacks :work do + receiver.send method, *args end - end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) - end + receiver.handle_exception if receiver.respond_to?(:handle_exception) end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection + @connection = channel.connection - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end private - def logger ActionCable.server.logger end |