diff options
-rw-r--r-- | lib/action_cable/connection.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/connection/base.rb | 10 | ||||
-rw-r--r-- | lib/action_cable/connection/heartbeat.rb | 30 | ||||
-rw-r--r-- | lib/action_cable/server/base.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/server/connections.rb | 11 | ||||
-rw-r--r-- | test/connection/base_test.rb | 4 |
6 files changed, 19 insertions, 38 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index c63621c519..3d6ed6a6e8 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -2,7 +2,6 @@ module ActionCable module Connection autoload :Authorization, 'action_cable/connection/authorization' autoload :Base, 'action_cable/connection/base' - autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :MessageBuffer, 'action_cable/connection/message_buffer' diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 08a75156a3..de1369f009 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -59,7 +59,6 @@ module ActionCable @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -115,6 +114,10 @@ module ActionCable { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } end + def beat + transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + protected # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc. @@ -133,14 +136,14 @@ module ActionCable private attr_reader :websocket - attr_reader :heartbeat, :subscriptions, :message_buffer + attr_reader :subscriptions, :message_buffer def on_open server.add_connection(self) connect if respond_to?(:connect) subscribe_to_internal_channel - heartbeat.start + beat message_buffer.process! rescue ActionCable::Connection::Authorization::UnauthorizedError @@ -159,7 +162,6 @@ module ActionCable subscriptions.unsubscribe_from_all unsubscribe_from_internal_channel - heartbeat.stop disconnect if respond_to?(:disconnect) end diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb deleted file mode 100644 index 2918938ba5..0000000000 --- a/lib/action_cable/connection/heartbeat.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - module Connection - # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically - # disconnect. - class Heartbeat - BEAT_INTERVAL = 3 - - def initialize(connection) - @connection = connection - end - - def start - beat - @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } - end - - def stop - EventMachine.cancel_timer(@timer) if @timer - end - - private - attr_reader :connection - - def beat - connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - end - end -end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 43849928b9..9315a48f20 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -18,6 +18,7 @@ module ActionCable # Called by rack to setup the server. def call(env) + setup_heartbeat_timer config.connection_class.new(self, env).process end diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb index 15d7c3c8c7..455ff1ab29 100644 --- a/lib/action_cable/server/connections.rb +++ b/lib/action_cable/server/connections.rb @@ -4,6 +4,8 @@ module ActionCable # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. # As such, this is primarily for internal use. module Connections + BEAT_INTERVAL = 3 + def connections @connections ||= [] end @@ -16,6 +18,15 @@ module ActionCable connections.delete connection end + # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you + # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # disconnect. + def setup_heartbeat_timer + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map &:beat } + end + end + def open_connections_statistics connections.map(&:statistics) end diff --git a/test/connection/base_test.rb b/test/connection/base_test.rb index 2f008652ee..81009f0849 100644 --- a/test/connection/base_test.rb +++ b/test/connection/base_test.rb @@ -3,7 +3,7 @@ require 'stubs/test_server' class ActionCable::Connection::BaseTest < ActiveSupport::TestCase class Connection < ActionCable::Connection::Base - attr_reader :websocket, :heartbeat, :subscriptions, :message_buffer, :connected + attr_reader :websocket, :subscriptions, :message_buffer, :connected def connect @connected = true @@ -40,7 +40,6 @@ class ActionCable::Connection::BaseTest < ActiveSupport::TestCase test "on connection open" do assert ! @connection.connected - EventMachine.expects(:add_periodic_timer) @connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) @connection.message_buffer.expects(:process!) @@ -56,7 +55,6 @@ class ActionCable::Connection::BaseTest < ActiveSupport::TestCase @connection.send :on_open assert @connection.connected - EventMachine.expects(:cancel_timer) @connection.subscriptions.expects(:unsubscribe_from_all) @connection.send :on_close |