aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable
diff options
context:
space:
mode:
authorSean Griffin <sean@seantheprogrammer.com>2016-04-14 11:25:08 -0600
committerSean Griffin <sean@seantheprogrammer.com>2016-04-14 11:25:08 -0600
commit4769fc4b4d5af7a8e93cc497d744151086aa5034 (patch)
treebf3b85588cb1fc27ab6e21cc344b24e89f0ee51e /actioncable
parent91798c75bc56fa706c9ed4a8dab6020462463a10 (diff)
parentd1766ef53ddbd91ff414896983e8f3e6b39d2dec (diff)
downloadrails-4769fc4b4d5af7a8e93cc497d744151086aa5034.tar.gz
rails-4769fc4b4d5af7a8e93cc497d744151086aa5034.tar.bz2
rails-4769fc4b4d5af7a8e93cc497d744151086aa5034.zip
Merge pull request #24540 from sgrif/sg-actioncable-callbacks
Run Action Cable callbacks through the worker pool
Diffstat (limited to 'actioncable')
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb7
-rw-r--r--actioncable/test/channel/stream_test.rb28
2 files changed, 30 insertions, 5 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index f654ce0bfa..8b46ac216a 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,8 +76,11 @@ module ActionCable
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- if handler = callback || block
- handler = -> message { handler.(coder.decode(message)) } if coder
+ if user_handler = callback || block
+ user_handler = -> message { handler.(coder.decode(message)) } if coder
+ handler = -> message do
+ connection.worker_pool.async_invoke(user_handler, :call, message)
+ end
else
handler = default_stream_handler(broadcasting, coder: coder)
end
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index f51f19eb7d..df129a7c62 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -116,11 +116,22 @@ module ActionCable::StreamTests
require 'action_cable/subscription_adapter/inline'
+ class UserCallbackChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from :channel do
+ Thread.current[:ran_callback] = true
+ end
+ end
+ end
+
class StreamEncodingTest < ActionCable::TestCase
setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
- @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
+ @server.stubs(:channel_classes).returns(
+ ChatChannel.name => ChatChannel,
+ UserCallbackChannel.name => UserCallbackChannel,
+ )
end
test 'custom encoder' do
@@ -134,6 +145,17 @@ module ActionCable::StreamTests
end
end
+ test "user supplied callbacks are run through the worker pool" do
+ run_in_eventmachine do
+ connection = open_connection
+ receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 })
+
+ @server.broadcast 'channel', {}
+ wait_for_async
+ refute Thread.current[:ran_callback], "User callback was not run through the worker pool"
+ end
+ end
+
private
def subscribe_to(connection, identifiers:)
receive connection, command: 'subscribe', identifiers: identifiers
@@ -151,8 +173,8 @@ module ActionCable::StreamTests
end
end
- def receive(connection, command:, identifiers:)
- identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers)
+ def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel')
+ identifier = JSON.generate(channel: channel, **identifiers)
connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier)
wait_for_async
end