diff options
author | Jeremy Daer <jeremydaer@gmail.com> | 2016-04-14 23:04:42 -0700 |
---|---|---|
committer | Jeremy Daer <jeremydaer@gmail.com> | 2016-04-18 23:29:51 -0700 |
commit | 3ba0eec20c79923ee701b13f297cc21a6f0f4a9b (patch) | |
tree | d984b35cb3b53e4279ae213e95d14e524c0f2daf /actioncable/lib/action_cable/server | |
parent | 7ad4690b2149fbb23faa179c21698b92ff383c73 (diff) | |
download | rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.gz rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.bz2 rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.zip |
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing.
* Fix worker pool execution that was silently failing since it only
expected connection receivers.
Sparked by code in #24162.
Diffstat (limited to 'actioncable/lib/action_cable/server')
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 46a8989f34..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e |