diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/action_cable/channel/streams.rb | 2 | ||||
-rw-r--r-- | lib/action_cable/connection.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/connection/base.rb | 17 | ||||
-rw-r--r-- | lib/action_cable/connection/heartbeat.rb | 30 | ||||
-rw-r--r-- | lib/action_cable/connection/identification.rb | 6 | ||||
-rw-r--r-- | lib/action_cable/connection/tagged_logger_proxy.rb | 5 | ||||
-rw-r--r-- | lib/action_cable/server.rb | 2 | ||||
-rw-r--r-- | lib/action_cable/server/base.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/server/connections.rb | 13 | ||||
-rw-r--r-- | lib/action_cable/server/worker.rb | 7 | ||||
-rw-r--r-- | lib/action_cable/server/worker/active_record_connection_management.rb (renamed from lib/action_cable/server/worker/clear_database_connections.rb) | 4 |
11 files changed, 45 insertions, 43 deletions
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb index 2d1506ee98..9fffdf1789 100644 --- a/lib/action_cable/channel/streams.rb +++ b/lib/action_cable/channel/streams.rb @@ -72,7 +72,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - pubsub.subscribe broadcasting, &callback + EM.next_tick { pubsub.subscribe broadcasting, &callback } logger.info "#{self.class.name} is streaming from #{broadcasting}" end diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index c63621c519..3d6ed6a6e8 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -2,7 +2,6 @@ module ActionCable module Connection autoload :Authorization, 'action_cable/connection/authorization' autoload :Base, 'action_cable/connection/base' - autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :MessageBuffer, 'action_cable/connection/message_buffer' diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index f7c5f050d8..2f2fa1fdec 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -59,7 +59,6 @@ module ActionCable @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -112,7 +111,16 @@ module ActionCable # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # This can be returned by a health check against the connection. def statistics - { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: subscriptions.identifiers, + request_id: @env['action_dispatch.request_id'] + } + end + + def beat + transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) end @@ -133,14 +141,14 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :subscriptions, :message_buffer def on_open server.add_connection(self) connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! rescue ActionCable::Connection::Authorization::UnauthorizedError @@ -159,7 +167,6 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb index 701e6885ad..4e9beac058 100644 --- a/lib/action_cable/connection/identification.rb +++ b/lib/action_cable/connection/identification.rb @@ -22,7 +22,11 @@ module ActionCable # Return a single connection identifier that combines the value of all the registered identifiers into a single gid. def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + if @connection_identifier.blank? + @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + @connection_identifier end private diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index 854f613f1c..34063c1d42 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -4,6 +4,8 @@ module ActionCable # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy + attr_reader :tags + def initialize(logger, tags:) @logger = logger @tags = tags.flatten @@ -22,7 +24,8 @@ module ActionCable protected def log(type, message) - @logger.tagged(*@tags) { @logger.send type, message } + current_tags = tags - @logger.formatter.current_tags + @logger.tagged(*current_tags) { @logger.send type, message } end end end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 919ebd94de..2278509341 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -6,6 +6,6 @@ module ActionCable autoload :Configuration, 'action_cable/server/configuration' autoload :Worker, 'action_cable/server/worker' - autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections' + autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management' end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 43849928b9..9315a48f20 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -18,6 +18,7 @@ module ActionCable # Called by rack to setup the server. def call(env) + setup_heartbeat_timer config.connection_class.new(self, env).process end diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb index 15d7c3c8c7..b3d1632cf7 100644 --- a/lib/action_cable/server/connections.rb +++ b/lib/action_cable/server/connections.rb @@ -4,6 +4,8 @@ module ActionCable # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. # As such, this is primarily for internal use. module Connections + BEAT_INTERVAL = 3 + def connections @connections ||= [] end @@ -16,6 +18,17 @@ module ActionCable connections.delete connection end + # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map &:beat } + end + end + end + def open_connections_statistics connections.map(&:statistics) end diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb index d7823ecf93..91496775b8 100644 --- a/lib/action_cable/server/worker.rb +++ b/lib/action_cable/server/worker.rb @@ -5,10 +5,13 @@ module ActionCable include ActiveSupport::Callbacks include Celluloid + attr_reader :connection define_callbacks :work - include ClearDatabaseConnections + include ActiveRecordConnectionManagement def invoke(receiver, method, *args) + @connection = receiver + run_callbacks :work do receiver.send method, *args end @@ -20,6 +23,8 @@ module ActionCable end def run_periodic_timer(channel, callback) + @connection = channel.connection + run_callbacks :work do callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb index 722d363a41..1ede0095f8 100644 --- a/lib/action_cable/server/worker/clear_database_connections.rb +++ b/lib/action_cable/server/worker/active_record_connection_management.rb @@ -2,7 +2,7 @@ module ActionCable module Server class Worker # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. - module ClearDatabaseConnections + module ActiveRecordConnectionManagement extend ActiveSupport::Concern included do @@ -12,7 +12,7 @@ module ActionCable end def with_database_connections - yield + ActiveRecord::Base.logger.tagged(*connection.logger.tags) { yield } ensure ActiveRecord::Base.clear_active_connections! end |