diff options
author | Jeremy Daer <jeremydaer@gmail.com> | 2016-04-14 23:04:42 -0700 |
---|---|---|
committer | Jeremy Daer <jeremydaer@gmail.com> | 2016-04-18 23:29:51 -0700 |
commit | 3ba0eec20c79923ee701b13f297cc21a6f0f4a9b (patch) | |
tree | d984b35cb3b53e4279ae213e95d14e524c0f2daf /actioncable/lib | |
parent | 7ad4690b2149fbb23faa179c21698b92ff383c73 (diff) | |
download | rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.gz rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.bz2 rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.zip |
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing.
* Fix worker pool execution that was silently failing since it only
expected connection receivers.
Sparked by code in #24162.
Diffstat (limited to 'actioncable/lib')
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 62 | ||||
-rw-r--r-- | actioncable/lib/action_cable/server/worker.rb | 18 |
2 files changed, 62 insertions, 18 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 84594fb3d6..200c9d053c 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -73,18 +73,13 @@ module ActionCable # Defaults to `coder: nil` which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - if user_handler = callback || block - user_handler = -> message { handler.(coder.decode(message)) } if coder - handler = -> message do - connection.worker_pool.async_invoke(user_handler, :call, message) - end - else - handler = default_stream_handler(broadcasting, coder: coder) - end - + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams << [ broadcasting, handler ] connection.server.event_loop.post do @@ -120,13 +115,60 @@ module ActionCable @_streams ||= [] end + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. def default_stream_handler(broadcasting, coder:) coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" -> (message) do - transmit coder.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 46a8989f34..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e |