aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/action_cable/channel/streams.rb2
-rw-r--r--lib/action_cable/connection.rb1
-rw-r--r--lib/action_cable/connection/base.rb17
-rw-r--r--lib/action_cable/connection/heartbeat.rb30
-rw-r--r--lib/action_cable/connection/identification.rb6
-rw-r--r--lib/action_cable/connection/tagged_logger_proxy.rb5
-rw-r--r--lib/action_cable/server.rb2
-rw-r--r--lib/action_cable/server/base.rb1
-rw-r--r--lib/action_cable/server/connections.rb13
-rw-r--r--lib/action_cable/server/worker.rb7
-rw-r--r--lib/action_cable/server/worker/active_record_connection_management.rb (renamed from lib/action_cable/server/worker/clear_database_connections.rb)4
-rw-r--r--test/connection/base_test.rb4
12 files changed, 46 insertions, 46 deletions
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
index 2d1506ee98..9fffdf1789 100644
--- a/lib/action_cable/channel/streams.rb
+++ b/lib/action_cable/channel/streams.rb
@@ -72,7 +72,7 @@ module ActionCable
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- pubsub.subscribe broadcasting, &callback
+ EM.next_tick { pubsub.subscribe broadcasting, &callback }
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index c63621c519..3d6ed6a6e8 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -2,7 +2,6 @@ module ActionCable
module Connection
autoload :Authorization, 'action_cable/connection/authorization'
autoload :Base, 'action_cable/connection/base'
- autoload :Heartbeat, 'action_cable/connection/heartbeat'
autoload :Identification, 'action_cable/connection/identification'
autoload :InternalChannel, 'action_cable/connection/internal_channel'
autoload :MessageBuffer, 'action_cable/connection/message_buffer'
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index f7c5f050d8..2f2fa1fdec 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -59,7 +59,6 @@ module ActionCable
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
- @heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -112,7 +111,16 @@ module ActionCable
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
- { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
end
@@ -133,14 +141,14 @@ module ActionCable
private
attr_reader :websocket
- attr_reader :heartbeat, :subscriptions, :message_buffer
+ attr_reader :subscriptions, :message_buffer
def on_open
server.add_connection(self)
connect if respond_to?(:connect)
subscribe_to_internal_channel
- heartbeat.start
+ beat
message_buffer.process!
rescue ActionCable::Connection::Authorization::UnauthorizedError
@@ -159,7 +167,6 @@ module ActionCable
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
- heartbeat.stop
disconnect if respond_to?(:disconnect)
end
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
deleted file mode 100644
index 2918938ba5..0000000000
--- a/lib/action_cable/connection/heartbeat.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-module ActionCable
- module Connection
- # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
- # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
- # disconnect.
- class Heartbeat
- BEAT_INTERVAL = 3
-
- def initialize(connection)
- @connection = connection
- end
-
- def start
- beat
- @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
- end
-
- def stop
- EventMachine.cancel_timer(@timer) if @timer
- end
-
- private
- attr_reader :connection
-
- def beat
- connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
- end
- end
-end
diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb
index 701e6885ad..4e9beac058 100644
--- a/lib/action_cable/connection/identification.rb
+++ b/lib/action_cable/connection/identification.rb
@@ -22,7 +22,11 @@ module ActionCable
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
def connection_identifier
- @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ if @connection_identifier.blank?
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
end
private
diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb
index 854f613f1c..34063c1d42 100644
--- a/lib/action_cable/connection/tagged_logger_proxy.rb
+++ b/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -4,6 +4,8 @@ module ActionCable
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
+ attr_reader :tags
+
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
@@ -22,7 +24,8 @@ module ActionCable
protected
def log(type, message)
- @logger.tagged(*@tags) { @logger.send type, message }
+ current_tags = tags - @logger.formatter.current_tags
+ @logger.tagged(*current_tags) { @logger.send type, message }
end
end
end
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index 919ebd94de..2278509341 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -6,6 +6,6 @@ module ActionCable
autoload :Configuration, 'action_cable/server/configuration'
autoload :Worker, 'action_cable/server/worker'
- autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections'
+ autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
end
end
diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb
index 43849928b9..9315a48f20 100644
--- a/lib/action_cable/server/base.rb
+++ b/lib/action_cable/server/base.rb
@@ -18,6 +18,7 @@ module ActionCable
# Called by rack to setup the server.
def call(env)
+ setup_heartbeat_timer
config.connection_class.new(self, env).process
end
diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb
index 15d7c3c8c7..b3d1632cf7 100644
--- a/lib/action_cable/server/connections.rb
+++ b/lib/action_cable/server/connections.rb
@@ -4,6 +4,8 @@ module ActionCable
# you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
# As such, this is primarily for internal use.
module Connections
+ BEAT_INTERVAL = 3
+
def connections
@connections ||= []
end
@@ -16,6 +18,17 @@ module ActionCable
connections.delete connection
end
+ # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
+ # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # disconnect.
+ def setup_heartbeat_timer
+ EM.next_tick do
+ @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
+ EM.next_tick { connections.map &:beat }
+ end
+ end
+ end
+
def open_connections_statistics
connections.map(&:statistics)
end
diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb
index d7823ecf93..91496775b8 100644
--- a/lib/action_cable/server/worker.rb
+++ b/lib/action_cable/server/worker.rb
@@ -5,10 +5,13 @@ module ActionCable
include ActiveSupport::Callbacks
include Celluloid
+ attr_reader :connection
define_callbacks :work
- include ClearDatabaseConnections
+ include ActiveRecordConnectionManagement
def invoke(receiver, method, *args)
+ @connection = receiver
+
run_callbacks :work do
receiver.send method, *args
end
@@ -20,6 +23,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
+ @connection = channel.connection
+
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb
index 722d363a41..1ede0095f8 100644
--- a/lib/action_cable/server/worker/clear_database_connections.rb
+++ b/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -2,7 +2,7 @@ module ActionCable
module Server
class Worker
# Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
- module ClearDatabaseConnections
+ module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
included do
@@ -12,7 +12,7 @@ module ActionCable
end
def with_database_connections
- yield
+ ActiveRecord::Base.logger.tagged(*connection.logger.tags) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end
diff --git a/test/connection/base_test.rb b/test/connection/base_test.rb
index bc8b5ba568..7118c34d9e 100644
--- a/test/connection/base_test.rb
+++ b/test/connection/base_test.rb
@@ -3,7 +3,7 @@ require 'stubs/test_server'
class ActionCable::Connection::BaseTest < ActiveSupport::TestCase
class Connection < ActionCable::Connection::Base
- attr_reader :websocket, :heartbeat, :subscriptions, :message_buffer, :connected
+ attr_reader :websocket, :subscriptions, :message_buffer, :connected
def connect
@connected = true
@@ -43,7 +43,6 @@ class ActionCable::Connection::BaseTest < ActiveSupport::TestCase
test "on connection open" do
assert ! @connection.connected
- EventMachine.expects(:add_periodic_timer)
@connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
@connection.message_buffer.expects(:process!)
@@ -59,7 +58,6 @@ class ActionCable::Connection::BaseTest < ActiveSupport::TestCase
@connection.send :on_open
assert @connection.connected
- EventMachine.expects(:cancel_timer)
@connection.subscriptions.expects(:unsubscribe_from_all)
@connection.send :on_close