From d1766ef53ddbd91ff414896983e8f3e6b39d2dec Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Wed, 13 Apr 2016 13:08:00 -0600 Subject: Run Action Cable callbacks through the worker pool Alternate implementation of #24162 with tests. The code had diverged too far on master to pull that implemenation directly. Fixes #23778 Close #24162 [Mattew Draper & Sean Griffin] --- actioncable/lib/action_cable/channel/streams.rb | 7 +++++-- actioncable/test/channel/stream_test.rb | 28 ++++++++++++++++++++++--- 2 files changed, 30 insertions(+), 5 deletions(-) (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index f654ce0bfa..8b46ac216a 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,8 +76,11 @@ module ActionCable # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - if handler = callback || block - handler = -> message { handler.(coder.decode(message)) } if coder + if user_handler = callback || block + user_handler = -> message { handler.(coder.decode(message)) } if coder + handler = -> message do + connection.worker_pool.async_invoke(user_handler, :call, message) + end else handler = default_stream_handler(broadcasting, coder: coder) end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index f51f19eb7d..df129a7c62 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -116,11 +116,22 @@ module ActionCable::StreamTests require 'action_cable/subscription_adapter/inline' + class UserCallbackChannel < ActionCable::Channel::Base + def subscribed + stream_from :channel do + Thread.current[:ran_callback] = true + end + end + end + class StreamEncodingTest < ActionCable::TestCase setup do @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) @server.config.allowed_request_origins = %w( http://rubyonrails.com ) - @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel) + @server.stubs(:channel_classes).returns( + ChatChannel.name => ChatChannel, + UserCallbackChannel.name => UserCallbackChannel, + ) end test 'custom encoder' do @@ -134,6 +145,17 @@ module ActionCable::StreamTests end end + test "user supplied callbacks are run through the worker pool" do + run_in_eventmachine do + connection = open_connection + receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 }) + + @server.broadcast 'channel', {} + wait_for_async + refute Thread.current[:ran_callback], "User callback was not run through the worker pool" + end + end + private def subscribe_to(connection, identifiers:) receive connection, command: 'subscribe', identifiers: identifiers @@ -151,8 +173,8 @@ module ActionCable::StreamTests end end - def receive(connection, command:, identifiers:) - identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers) + def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel') + identifier = JSON.generate(channel: channel, **identifiers) connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier) wait_for_async end -- cgit v1.2.3