aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
authorJeremy Daer <jeremydaer@gmail.com>2016-04-14 23:04:42 -0700
committerJeremy Daer <jeremydaer@gmail.com>2016-04-18 23:29:51 -0700
commit3ba0eec20c79923ee701b13f297cc21a6f0f4a9b (patch)
treed984b35cb3b53e4279ae213e95d14e524c0f2daf /actioncable
parent7ad4690b2149fbb23faa179c21698b92ff383c73 (diff)
downloadrails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.gz
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.bz2
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.zip
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing. * Fix worker pool execution that was silently failing since it only expected connection receivers. Sparked by code in #24162.
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb62
-rw-r--r--actioncable/lib/action_cable/server/worker.rb18
-rw-r--r--actioncable/test/channel/stream_test.rb1
-rw-r--r--actioncable/test/client_test.rb4
-rw-r--r--actioncable/test/test_helper.rb11
-rw-r--r--actioncable/test/worker_test.rb4
6 files changed, 75 insertions, 25 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 84594fb3d6..200c9d053c 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -73,18 +73,13 @@ module ActionCable
# Defaults to `coder: nil` which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
+
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- if user_handler = callback || block
- user_handler = -> message { handler.(coder.decode(message)) } if coder
- handler = -> message do
- connection.worker_pool.async_invoke(user_handler, :call, message)
- end
- else
- handler = default_stream_handler(broadcasting, coder: coder)
- end
-
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams << [ broadcasting, handler ]
connection.server.event_loop.post do
@@ -120,13 +115,60 @@ module ActionCable
@_streams ||= []
end
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
def default_stream_handler(broadcasting, coder:)
coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
-> (message) do
- transmit coder.decode(message), via: "streamed from #{broadcasting}"
+ transmit handler.(message), via: via
end
end
+
+ def identity_handler
+ -> message { message }
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 46a8989f34..a638ff72e7 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -12,8 +12,10 @@ module ActionCable
define_callbacks :work
include ActiveRecordConnectionManagement
+ attr_reader :executor
+
def initialize(max_size: 5)
- @pool = Concurrent::ThreadPoolExecutor.new(
+ @executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
@@ -23,11 +25,11 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @pool.kill
+ @executor.kill
end
def stopping?
- @pool.shuttingdown?
+ @executor.shuttingdown?
end
def work(connection)
@@ -40,14 +42,14 @@ module ActionCable
self.connection = nil
end
- def async_invoke(receiver, method, *args)
- @pool.post do
- invoke(receiver, method, *args)
+ def async_invoke(receiver, method, *args, connection: receiver)
+ @executor.post do
+ invoke(receiver, method, *args, connection: connection)
end
end
- def invoke(receiver, method, *args)
- work(receiver) do
+ def invoke(receiver, method, *args, connection:)
+ work(connection) do
begin
receiver.send method, *args
rescue Exception => e
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index df129a7c62..0b0c72ccf6 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -142,6 +142,7 @@ module ActionCable::StreamTests
connection.websocket.expects(:transmit)
@server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder
wait_for_async
+ wait_for_executor connection.server.worker_pool.executor
end
end
diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb
index 5ac453db35..fe503fd703 100644
--- a/actioncable/test/client_test.rb
+++ b/actioncable/test/client_test.rb
@@ -226,7 +226,9 @@ class ClientTest < ActionCable::TestCase
assert_equal(1, app.connections.count)
assert(app.remote_connections.where(identifier: identifier))
- channel = app.connections.first.subscriptions.send(:subscriptions).first[1]
+ subscriptions = app.connections.first.subscriptions.send(:subscriptions)
+ assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription'
+ channel = subscriptions.first[1]
channel.expects(:unsubscribed)
c.close
sleep 0.1 # Data takes a moment to process
diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb
index de1ee96770..0a9ee7ce77 100644
--- a/actioncable/test/test_helper.rb
+++ b/actioncable/test/test_helper.rb
@@ -49,10 +49,7 @@ end
module ConcurrentRubyConcurrencyHelpers
def wait_for_async
- e = Concurrent.global_io_executor
- until e.completed_task_count == e.scheduled_task_count
- sleep 0.1
- end
+ wait_for_executor Concurrent.global_io_executor
end
def run_in_eventmachine
@@ -67,4 +64,10 @@ class ActionCable::TestCase < ActiveSupport::TestCase
else
include ConcurrentRubyConcurrencyHelpers
end
+
+ def wait_for_executor(executor)
+ until executor.completed_task_count == executor.scheduled_task_count
+ sleep 0.1
+ end
+ end
end
diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb
index 7016da3493..e2c81fe312 100644
--- a/actioncable/test/worker_test.rb
+++ b/actioncable/test/worker_test.rb
@@ -33,12 +33,12 @@ class WorkerTest < ActiveSupport::TestCase
end
test "invoke" do
- @worker.invoke @receiver, :run
+ @worker.invoke @receiver, :run, connection: @receiver.connection
assert_equal :run, @receiver.last_action
end
test "invoke with arguments" do
- @worker.invoke @receiver, :process, "Hello"
+ @worker.invoke @receiver, :process, "Hello", connection: @receiver.connection
assert_equal [ :process, "Hello" ], @receiver.last_action
end
end