aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/server/worker.rb
diff options
context:
space:
mode:
authorJeremy Daer <jeremydaer@gmail.com>2016-04-14 23:04:42 -0700
committerJeremy Daer <jeremydaer@gmail.com>2016-04-18 23:29:51 -0700
commit3ba0eec20c79923ee701b13f297cc21a6f0f4a9b (patch)
treed984b35cb3b53e4279ae213e95d14e524c0f2daf /actioncable/lib/action_cable/server/worker.rb
parent7ad4690b2149fbb23faa179c21698b92ff383c73 (diff)
downloadrails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.gz
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.bz2
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.zip
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing. * Fix worker pool execution that was silently failing since it only expected connection receivers. Sparked by code in #24162.
Diffstat (limited to 'actioncable/lib/action_cable/server/worker.rb')
-rw-r--r--actioncable/lib/action_cable/server/worker.rb18
1 files changed, 10 insertions, 8 deletions
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 46a8989f34..a638ff72e7 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -12,8 +12,10 @@ module ActionCable
define_callbacks :work
include ActiveRecordConnectionManagement
+ attr_reader :executor
+
def initialize(max_size: 5)
- @pool = Concurrent::ThreadPoolExecutor.new(
+ @executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
@@ -23,11 +25,11 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @pool.kill
+ @executor.kill
end
def stopping?
- @pool.shuttingdown?
+ @executor.shuttingdown?
end
def work(connection)
@@ -40,14 +42,14 @@ module ActionCable
self.connection = nil
end
- def async_invoke(receiver, method, *args)
- @pool.post do
- invoke(receiver, method, *args)
+ def async_invoke(receiver, method, *args, connection: receiver)
+ @executor.post do
+ invoke(receiver, method, *args, connection: connection)
end
end
- def invoke(receiver, method, *args)
- work(receiver) do
+ def invoke(receiver, method, *args, connection:)
+ work(connection) do
begin
receiver.send method, *args
rescue Exception => e