diff options
Diffstat (limited to 'actioncable')
10 files changed, 100 insertions, 15 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index a767857607..cc15e9bf61 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,2 +1,17 @@ +* Protect against concurrent writes to a websocket connection from + multiple threads; the underlying OS write is not always threadsafe. + + *Tinco Andringa* + +* Add ActiveSupport::Notifications hook to Broadcaster#broadcast + + *Matthew Wear* + +* Close hijacked socket when connection is shut down. + + Fixes #25613. + + *Tinco Andringa* + Please check [5-0-stable](https://github.com/rails/rails/blob/5-0-stable/actioncable/CHANGELOG.md) for previous changes. diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index c250cf92fc..5695623859 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -1,3 +1,5 @@ +require 'thread' + module ActionCable module Connection #-- @@ -11,6 +13,7 @@ module ActionCable @stream_send = socket.env['stream.send'] @rack_hijack_io = nil + @write_lock = Mutex.new end def each(&callback) @@ -27,8 +30,10 @@ module ActionCable end def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + @write_lock.synchronize do + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + end rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 8f93564113..1fc58baa3e 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,8 +39,12 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" - encoded = coder ? coder.encode(message) : message - server.pubsub.broadcast broadcasting, encoded + + payload = { broadcasting: broadcasting, message: message, coder: coder } + ActiveSupport::Notifications.instrument("broadcast.action_cable", payload) do + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded + end end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb index 37eed09793..4ec513e3ba 100644 --- a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -32,7 +32,11 @@ module ActionCable end def broadcast(channel, message) - list = @sync.synchronize { @subscribers[channel].dup } + list = @sync.synchronize do + return if !@subscribers.key?(channel) + @subscribers[channel].dup + end + list.each do |subscriber| invoke_callback(subscriber, message) end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb index 17a85f60f9..d672697283 100644 --- a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -1,5 +1,3 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in -# a loop that does not support auto reloading. module ApplicationCable class Channel < ActionCable::Channel::Base end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb index 93f28c4306..0ff5442f47 100644 --- a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -1,5 +1,3 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in -# a loop that does not support auto reloading. module ApplicationCable class Connection < ActionCable::Connection::Base end diff --git a/actioncable/lib/rails/generators/channel/templates/assets/cable.js b/actioncable/lib/rails/generators/channel/templates/assets/cable.js index 71ee1e66de..739aa5f022 100644 --- a/actioncable/lib/rails/generators/channel/templates/assets/cable.js +++ b/actioncable/lib/rails/generators/channel/templates/assets/cable.js @@ -1,5 +1,5 @@ // Action Cable provides the framework to deal with WebSockets in Rails. -// You can generate new channels where WebSocket features live using the rails generate channel command. +// You can generate new channels where WebSocket features live using the `rails generate channel` command. // //= require action_cable //= require_self diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 7bff3341c1..4bcfb2be4d 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,3 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed diff --git a/actioncable/test/server/broadcasting_test.rb b/actioncable/test/server/broadcasting_test.rb index 3b4a7eaf90..ed377b7d5d 100644 --- a/actioncable/test/server/broadcasting_test.rb +++ b/actioncable/test/server/broadcasting_test.rb @@ -1,10 +1,7 @@ require "test_helper" +require "stubs/test_server" class BroadcastingTest < ActiveSupport::TestCase - class TestServer - include ActionCable::Server::Broadcasting - end - test "fetching a broadcaster converts the broadcasting queue to a string" do broadcasting = :test_queue server = TestServer.new @@ -12,4 +9,52 @@ class BroadcastingTest < ActiveSupport::TestCase assert_equal "test_queue", broadcaster.broadcasting end + + test "broadcast generates notification" do + begin + server = TestServer.new + + events = [] + ActiveSupport::Notifications.subscribe "broadcast.action_cable" do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + broadcasting = "test_queue" + message = { body: "test message" } + server.broadcast(broadcasting, message) + + assert_equal 1, events.length + assert_equal "broadcast.action_cable", events[0].name + assert_equal broadcasting, events[0].payload[:broadcasting] + assert_equal message, events[0].payload[:message] + assert_equal ActiveSupport::JSON, events[0].payload[:coder] + ensure + ActiveSupport::Notifications.unsubscribe "broadcast.action_cable" + end + end + + test "broadcaster from broadcaster_for generates notification" do + begin + server = TestServer.new + + events = [] + ActiveSupport::Notifications.subscribe "broadcast.action_cable" do |*args| + events << ActiveSupport::Notifications::Event.new(*args) + end + + broadcasting = "test_queue" + message = { body: "test message" } + + broadcaster = server.broadcaster_for(broadcasting) + broadcaster.broadcast(message) + + assert_equal 1, events.length + assert_equal "broadcast.action_cable", events[0].name + assert_equal broadcasting, events[0].payload[:broadcasting] + assert_equal message, events[0].payload[:message] + assert_equal ActiveSupport::JSON, events[0].payload[:coder] + ensure + ActiveSupport::Notifications.unsubscribe "broadcast.action_cable" + end + end end diff --git a/actioncable/test/subscription_adapter/subscriber_map_test.rb b/actioncable/test/subscription_adapter/subscriber_map_test.rb new file mode 100644 index 0000000000..5965ac2b90 --- /dev/null +++ b/actioncable/test/subscription_adapter/subscriber_map_test.rb @@ -0,0 +1,17 @@ +require 'test_helper' + +class SubscriberMapTest < ActionCable::TestCase + test "broadcast should not change subscribers" do + setup_subscription_map + origin = @subscription_map.instance_variable_get(:@subscribers).dup + + @subscription_map.broadcast('not_exist_channel', '') + + assert_equal origin, @subscription_map.instance_variable_get(:@subscribers) + end + + private + def setup_subscription_map + @subscription_map = ActionCable::SubscriptionAdapter::SubscriberMap.new + end +end |