diff options
Diffstat (limited to 'actioncable/lib/action_cable')
3 files changed, 18 insertions, 5 deletions
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index c250cf92fc..5695623859 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -1,3 +1,5 @@ +require 'thread' + module ActionCable module Connection #-- @@ -11,6 +13,7 @@ module ActionCable @stream_send = socket.env['stream.send'] @rack_hijack_io = nil + @write_lock = Mutex.new end def each(&callback) @@ -27,8 +30,10 @@ module ActionCable end def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + @write_lock.synchronize do + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + end rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 8f93564113..1fc58baa3e 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,8 +39,12 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" - encoded = coder ? coder.encode(message) : message - server.pubsub.broadcast broadcasting, encoded + + payload = { broadcasting: broadcasting, message: message, coder: coder } + ActiveSupport::Notifications.instrument("broadcast.action_cable", payload) do + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded + end end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb index 37eed09793..4ec513e3ba 100644 --- a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -32,7 +32,11 @@ module ActionCable end def broadcast(channel, message) - list = @sync.synchronize { @subscribers[channel].dup } + list = @sync.synchronize do + return if !@subscribers.key?(channel) + @subscribers[channel].dup + end + list.each do |subscriber| invoke_callback(subscriber, message) end |