aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel/streams.rb
diff options
context:
space:
mode:
authorJeremy Daer <jeremydaer@gmail.com>2016-04-14 23:04:42 -0700
committerJeremy Daer <jeremydaer@gmail.com>2016-04-18 23:29:51 -0700
commit3ba0eec20c79923ee701b13f297cc21a6f0f4a9b (patch)
treed984b35cb3b53e4279ae213e95d14e524c0f2daf /actioncable/lib/action_cable/channel/streams.rb
parent7ad4690b2149fbb23faa179c21698b92ff383c73 (diff)
downloadrails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.gz
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.tar.bz2
rails-3ba0eec20c79923ee701b13f297cc21a6f0f4a9b.zip
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing. * Fix worker pool execution that was silently failing since it only expected connection receivers. Sparked by code in #24162.
Diffstat (limited to 'actioncable/lib/action_cable/channel/streams.rb')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb62
1 files changed, 52 insertions, 10 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 84594fb3d6..200c9d053c 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -73,18 +73,13 @@ module ActionCable
# Defaults to `coder: nil` which does no decoding, passes raw messages.
def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
+
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- if user_handler = callback || block
- user_handler = -> message { handler.(coder.decode(message)) } if coder
- handler = -> message do
- connection.worker_pool.async_invoke(user_handler, :call, message)
- end
- else
- handler = default_stream_handler(broadcasting, coder: coder)
- end
-
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
streams << [ broadcasting, handler ]
connection.server.event_loop.post do
@@ -120,13 +115,60 @@ module ActionCable
@_streams ||= []
end
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
def default_stream_handler(broadcasting, coder:)
coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
-> (message) do
- transmit coder.decode(message), via: "streamed from #{broadcasting}"
+ transmit handler.(message), via: via
end
end
+
+ def identity_handler
+ -> message { message }
+ end
end
end
end