aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb7
-rw-r--r--actioncable/test/channel/stream_test.rb28
2 files changed, 30 insertions, 5 deletions
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index f654ce0bfa..8b46ac216a 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -76,8 +76,11 @@ module ActionCable
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- if handler = callback || block
- handler = -> message { handler.(coder.decode(message)) } if coder
+ if user_handler = callback || block
+ user_handler = -> message { handler.(coder.decode(message)) } if coder
+ handler = -> message do
+ connection.worker_pool.async_invoke(user_handler, :call, message)
+ end
else
handler = default_stream_handler(broadcasting, coder: coder)
end
diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb
index f51f19eb7d..df129a7c62 100644
--- a/actioncable/test/channel/stream_test.rb
+++ b/actioncable/test/channel/stream_test.rb
@@ -116,11 +116,22 @@ module ActionCable::StreamTests
require 'action_cable/subscription_adapter/inline'
+ class UserCallbackChannel < ActionCable::Channel::Base
+ def subscribed
+ stream_from :channel do
+ Thread.current[:ran_callback] = true
+ end
+ end
+ end
+
class StreamEncodingTest < ActionCable::TestCase
setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
- @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
+ @server.stubs(:channel_classes).returns(
+ ChatChannel.name => ChatChannel,
+ UserCallbackChannel.name => UserCallbackChannel,
+ )
end
test 'custom encoder' do
@@ -134,6 +145,17 @@ module ActionCable::StreamTests
end
end
+ test "user supplied callbacks are run through the worker pool" do
+ run_in_eventmachine do
+ connection = open_connection
+ receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 })
+
+ @server.broadcast 'channel', {}
+ wait_for_async
+ refute Thread.current[:ran_callback], "User callback was not run through the worker pool"
+ end
+ end
+
private
def subscribe_to(connection, identifiers:)
receive connection, command: 'subscribe', identifiers: identifiers
@@ -151,8 +173,8 @@ module ActionCable::StreamTests
end
end
- def receive(connection, command:, identifiers:)
- identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers)
+ def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel')
+ identifier = JSON.generate(channel: channel, **identifiers)
connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier)
wait_for_async
end