diff options
-rw-r--r-- | lib/action_cable/connection.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/connection/base.rb | 17 | ||||
-rw-r--r-- | lib/action_cable/connection/heartbeat.rb | 27 |
3 files changed, 33 insertions, 12 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 31480f220f..991dd85c57 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,6 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' + autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :Subscriptions, 'action_cable/connection/subscriptions' diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index df07c567fb..6fb0a61743 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -4,13 +4,9 @@ module ActionCable include Identification include InternalChannel - PING_INTERVAL = 3 - attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server - attr_reader :subscriptions - attr_reader :logger def initialize(server, env) @@ -23,6 +19,7 @@ module ActionCable @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) end @@ -33,8 +30,7 @@ module ActionCable @websocket = Faye::WebSocket.new(@env) @websocket.on(:open) do |event| - transmit_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp } + heartbeat.start worker_pool.async.invoke(self, :on_open) end @@ -53,8 +49,8 @@ module ActionCable @websocket.on(:close) do |event| logger.info finished_request_message + heartbeat.stop worker_pool.async.invoke(self, :on_close) - EventMachine.cancel_timer(@ping_timer) if @ping_timer end @websocket.rack_response @@ -109,6 +105,8 @@ module ActionCable private + attr_reader :heartbeat, :subscriptions + def on_open server.add_connection(self) @@ -128,11 +126,6 @@ module ActionCable end - def transmit_ping_timestamp - transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - - def process_message(message) subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) rescue Exception => e diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb new file mode 100644 index 0000000000..47cd937c25 --- /dev/null +++ b/lib/action_cable/connection/heartbeat.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + class Heartbeat + BEAT_INTERVAL = 3 + + def initialize(connection) + @connection = connection + end + + def start + beat + @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } + end + + def stop + EventMachine.cancel_timer(@timer) if @timer + end + + private + attr_reader :connection + + def beat + connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + end + end +end
\ No newline at end of file |