aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'lib/action_cable')
-rw-r--r--lib/action_cable/connection.rb1
-rw-r--r--lib/action_cable/connection/base.rb17
-rw-r--r--lib/action_cable/connection/heartbeat.rb27
3 files changed, 33 insertions, 12 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index 31480f220f..991dd85c57 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -1,6 +1,7 @@
module ActionCable
module Connection
autoload :Base, 'action_cable/connection/base'
+ autoload :Heartbeat, 'action_cable/connection/heartbeat'
autoload :Identification, 'action_cable/connection/identification'
autoload :InternalChannel, 'action_cable/connection/internal_channel'
autoload :Subscriptions, 'action_cable/connection/subscriptions'
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index df07c567fb..6fb0a61743 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -4,13 +4,9 @@ module ActionCable
include Identification
include InternalChannel
- PING_INTERVAL = 3
-
attr_reader :server, :env
delegate :worker_pool, :pubsub, to: :server
- attr_reader :subscriptions
-
attr_reader :logger
def initialize(server, env)
@@ -23,6 +19,7 @@ module ActionCable
@logger = TaggedLoggerProxy.new(server.logger, tags: log_tags)
+ @heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
end
@@ -33,8 +30,7 @@ module ActionCable
@websocket = Faye::WebSocket.new(@env)
@websocket.on(:open) do |event|
- transmit_ping_timestamp
- @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp }
+ heartbeat.start
worker_pool.async.invoke(self, :on_open)
end
@@ -53,8 +49,8 @@ module ActionCable
@websocket.on(:close) do |event|
logger.info finished_request_message
+ heartbeat.stop
worker_pool.async.invoke(self, :on_close)
- EventMachine.cancel_timer(@ping_timer) if @ping_timer
end
@websocket.rack_response
@@ -109,6 +105,8 @@ module ActionCable
private
+ attr_reader :heartbeat, :subscriptions
+
def on_open
server.add_connection(self)
@@ -128,11 +126,6 @@ module ActionCable
end
- def transmit_ping_timestamp
- transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
-
-
def process_message(message)
subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data']))
rescue Exception => e
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
new file mode 100644
index 0000000000..47cd937c25
--- /dev/null
+++ b/lib/action_cable/connection/heartbeat.rb
@@ -0,0 +1,27 @@
+module ActionCable
+ module Connection
+ class Heartbeat
+ BEAT_INTERVAL = 3
+
+ def initialize(connection)
+ @connection = connection
+ end
+
+ def start
+ beat
+ @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
+ end
+
+ def stop
+ EventMachine.cancel_timer(@timer) if @timer
+ end
+
+ private
+ attr_reader :connection
+
+ def beat
+ connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
+ end
+ end
+ end
+end \ No newline at end of file