From f9ef9486f157da6573a7c4867f905d4e57023e12 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 15:06:15 +0200 Subject: Space --- lib/action_cable/connection/tagged_logger_proxy.rb | 1 - 1 file changed, 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index d99cc2e9a3..e9e12e2672 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -1,7 +1,6 @@ module ActionCable module Connection class TaggedLoggerProxy - def initialize(logger, tags:) @logger = logger @tags = tags.flatten -- cgit v1.2.3 From 829ae0b2e2cbd580a89a933ce032dae30aa34629 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 19:39:43 +0200 Subject: Spacing --- lib/action_cable/connection/identifier.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb index a608fc546a..62524263bd 100644 --- a/lib/action_cable/connection/identifier.rb +++ b/lib/action_cable/connection/identifier.rb @@ -6,11 +6,11 @@ module ActionCable end def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}.compact + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact end def connection_gid(ids) - ids.map {|o| o.to_global_id.to_s }.sort.join(":") + ids.map { |o| o.to_global_id.to_s }.sort.join(":") end end end -- cgit v1.2.3 From e2a5a323fd1764c8a2b8d34ebfde65e527a1aedd Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 19:47:36 +0200 Subject: Homogenize lifecycle method names Active, present voice. --- lib/action_cable/connection/base.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 89d0844031..2ca284b62f 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -44,7 +44,7 @@ module ActionCable if message.is_a?(String) if @accept_messages - worker_pool.async.invoke(self, :received_data, message) + worker_pool.async.invoke(self, :receive_data, message) else @pending_messages << message end @@ -54,7 +54,7 @@ module ActionCable @websocket.on(:close) do |event| logger.info finished_request_message - worker_pool.async.invoke(self, :on_connection_closed) + worker_pool.async.invoke(self, :close_connection) EventMachine.cancel_timer(@ping_timer) if @ping_timer end @@ -64,7 +64,6 @@ module ActionCable end end - def received_data(data) return unless websocket_alive? data = ActiveSupport::JSON.decode data @@ -76,6 +75,7 @@ module ActionCable unsubscribe_channel(data) when 'message' process_message(data) + def receive_data(data) else logger.error "Received unrecognized command in #{data.inspect}" end @@ -124,10 +124,10 @@ module ActionCable subscribe_to_internal_channel @accept_messages = true - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + worker_pool.async.invoke(self, :receive_data, @pending_messages.shift) until @pending_messages.empty? end - def on_connection_closed + def close_connection server.remove_connection(self) cleanup_subscriptions -- cgit v1.2.3 From b7ce9b652e2de0f724941b076078370e4c7590bc Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 19:48:25 +0200 Subject: Add logging for when data is received without a live web socket And make cleanup_subscriptions private --- lib/action_cable/connection/base.rb | 39 ++++++++++++++++++++----------------- 1 file changed, 21 insertions(+), 18 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 2ca284b62f..32f6d0ae91 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -64,26 +64,22 @@ module ActionCable end end - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['command'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) def receive_data(data) + if websocket_alive? + data = ActiveSupport::JSON.decode data + + case data['command'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + else + logger.error "Received unrecognized command in #{data.inspect}" + end else - logger.error "Received unrecognized command in #{data.inspect}" - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection + logger.error "Received data without a live websocket (#{data.inspect})" end end @@ -135,6 +131,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.perform_disconnection + end + end + + def transmit_ping_timestamp transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) end -- cgit v1.2.3 From 890bee58f31ffe65e0127a8795c3432a18633013 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 19:49:31 +0200 Subject: Clarify that the incoming data is JSON --- lib/action_cable/connection/base.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 32f6d0ae91..321a57fe44 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -64,9 +64,9 @@ module ActionCable end end - def receive_data(data) + def receive_data(data_in_json) if websocket_alive? - data = ActiveSupport::JSON.decode data + data = ActiveSupport::JSON.decode data_in_json case data['command'] when 'subscribe' -- cgit v1.2.3 From 6ae798fc842c65011528175136fcda85d95ab16c Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 19:50:17 +0200 Subject: Styling --- lib/action_cable/connection/base.rb | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 321a57fe44..6973848589 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -69,12 +69,9 @@ module ActionCable data = ActiveSupport::JSON.decode data_in_json case data['command'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) + when 'subscribe' then subscribe_channel data + when 'unsubscribe' then unsubscribe_channel data + when 'message' then process_message data else logger.error "Received unrecognized command in #{data.inspect}" end -- cgit v1.2.3 From a4a68c2aff6b9aa114b0309d008e7f4180169bd4 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 20:25:53 +0200 Subject: Match transmit. No need to qualify _data --- lib/action_cable/connection/base.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 6973848589..4a67167bac 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -44,7 +44,7 @@ module ActionCable if message.is_a?(String) if @accept_messages - worker_pool.async.invoke(self, :receive_data, message) + worker_pool.async.invoke(self, :receive, message) else @pending_messages << message end @@ -64,7 +64,7 @@ module ActionCable end end - def receive_data(data_in_json) + def receive(data_in_json) if websocket_alive? data = ActiveSupport::JSON.decode data_in_json @@ -117,7 +117,7 @@ module ActionCable subscribe_to_internal_channel @accept_messages = true - worker_pool.async.invoke(self, :receive_data, @pending_messages.shift) until @pending_messages.empty? + worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty? end def close_connection -- cgit v1.2.3 From cc5ad6a65729a7c2c922e230c68b137762b8b127 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 20:55:16 +0200 Subject: Order of appearance --- lib/action_cable/connection/base.rb | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 4a67167bac..4b73a90dc1 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -84,13 +84,6 @@ module ActionCable @websocket.send data end - def statistics - { - identifier: connection_identifier, - started_at: @started_at, - subscriptions: @subscriptions.keys - } - end def handle_exception close_connection @@ -101,6 +94,16 @@ module ActionCable @websocket.close end + + def statistics + { + identifier: connection_identifier, + started_at: @started_at, + subscriptions: @subscriptions.keys + } + end + + protected def request @request ||= ActionDispatch::Request.new(Rails.application.env_config.merge(env)) -- cgit v1.2.3 From 81ae9ee32162ececbd70664b2821e7c636eaed8b Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 21:08:20 +0200 Subject: Consolidate all identification logic in a single concern --- lib/action_cable/connection.rb | 2 +- lib/action_cable/connection/base.rb | 10 ++-------- lib/action_cable/connection/identification.rb | 26 +++++++++++++++++++++++++ lib/action_cable/connection/identifier.rb | 17 ---------------- lib/action_cable/connection/internal_channel.rb | 4 ++++ 5 files changed, 33 insertions(+), 26 deletions(-) create mode 100644 lib/action_cable/connection/identification.rb delete mode 100644 lib/action_cable/connection/identifier.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index a9048926e4..8a695a3d0d 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,8 +1,8 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' + autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :Identifier, 'action_cable/connection/identifier' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 4b73a90dc1..0d666713d2 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -1,17 +1,11 @@ module ActionCable module Connection class Base - include InternalChannel, Identifier + include Identification + include InternalChannel PING_INTERVAL = 3 - class_attribute :identifiers - self.identifiers = Set.new - - def self.identified_by(*identifiers) - self.identifiers += identifiers - end - attr_reader :env, :server, :logger delegate :worker_pool, :pubsub, to: :server diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb new file mode 100644 index 0000000000..246636198b --- /dev/null +++ b/lib/action_cable/connection/identification.rb @@ -0,0 +1,26 @@ +module ActionCable + module Connection + module Identification + extend ActiveSupport::Concern + + included do + class_attribute :identifiers + self.identifiers = Set.new + end + + class_methods do + def identified_by(*identifiers) + self.identifiers += identifiers + end + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact + end + + def connection_gid(ids) + ids.map { |o| o.to_global_id.to_s }.sort.join(":") + end + end + end +end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb deleted file mode 100644 index 62524263bd..0000000000 --- a/lib/action_cable/connection/identifier.rb +++ /dev/null @@ -1,17 +0,0 @@ -module ActionCable - module Connection - module Identifier - def internal_redis_channel - "action_cable/#{connection_identifier}" - end - - def connection_identifier - @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact - end - - def connection_gid(ids) - ids.map { |o| o.to_global_id.to_s }.sort.join(":") - end - end - end -end diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 3a11bcaf7b..55dfc72777 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -3,6 +3,10 @@ module ActionCable module InternalChannel extend ActiveSupport::Concern + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + def subscribe_to_internal_channel if connection_identifier.present? callback = -> (message) { process_internal_message(message) } -- cgit v1.2.3 From 22b9882ea6ab621ac5deceb700ec503f796812e6 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 21:34:04 +0200 Subject: Styling --- lib/action_cable/connection/base.rb | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 0d666713d2..175b596241 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -5,17 +5,20 @@ module ActionCable include InternalChannel PING_INTERVAL = 3 - - attr_reader :env, :server, :logger + + attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :logger + def initialize(server, env) @started_at = Time.now - @server = server - @env = env - @accept_messages = false + @server, @env = server, env + + @accept_messages = false @pending_messages = [] + @subscriptions = {} @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) -- cgit v1.2.3 From f8638f789a1cbd33205cdce0dd24f2aee3d69a25 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 21:41:43 +0200 Subject: Rename callback hooks to match setup And make it all private --- lib/action_cable/connection/base.rb | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 175b596241..f6beda1c57 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -33,7 +33,7 @@ module ActionCable @websocket.on(:open) do |event| transmit_ping_timestamp @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp } - worker_pool.async.invoke(self, :initialize_connection) + worker_pool.async.invoke(self, :on_open) end @websocket.on(:message) do |event| @@ -51,7 +51,7 @@ module ActionCable @websocket.on(:close) do |event| logger.info finished_request_message - worker_pool.async.invoke(self, :close_connection) + worker_pool.async.invoke(self, :on_close) EventMachine.cancel_timer(@ping_timer) if @ping_timer end @@ -110,7 +110,9 @@ module ActionCable request.cookie_jar end - def initialize_connection + + private + def on_open server.add_connection(self) connect if respond_to?(:connect) @@ -120,7 +122,7 @@ module ActionCable worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty? end - def close_connection + def on_close server.remove_connection(self) cleanup_subscriptions -- cgit v1.2.3 From 125a8445f304348d3e530e85b30991c2346155fd Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 21:44:22 +0200 Subject: Don't namespace methods with the class we are already in --- lib/action_cable/connection/base.rb | 4 ++-- lib/action_cable/server.rb | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index f6beda1c57..d96216edc2 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -83,10 +83,10 @@ module ActionCable def handle_exception - close_connection + close end - def close_connection + def close logger.error "Closing connection" @websocket.close end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 322fc85519..3a16f51757 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -34,7 +34,7 @@ module ActionCable redis.on(:reconnect_failed) do logger.info "[ActionCable] Redis reconnect failed." # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close_connection + # @connections.map &:close end redis end -- cgit v1.2.3 From 1029c49c32cf3b8c7a013753783ab37f364ad65d Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 21:45:37 +0200 Subject: Remove anemic indirection --- lib/action_cable/connection/base.rb | 5 ----- lib/action_cable/connection/internal_channel.rb | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index d96216edc2..ffef85ead2 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -81,11 +81,6 @@ module ActionCable @websocket.send data end - - def handle_exception - close - end - def close logger.error "Closing connection" @websocket.close diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 55dfc72777..885457d8c2 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -37,7 +37,7 @@ module ActionCable logger.error "There was an exception - #{e.class}(#{e.message})" logger.error e.backtrace.join("\n") - handle_exception + close end end end -- cgit v1.2.3 From e7b1ced7a4fab45e3fc5851e5500426022fa0c47 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 22:35:07 +0200 Subject: Extracted Subscriptions class --- lib/action_cable/connection.rb | 1 + lib/action_cable/connection/base.rb | 46 +++++------------------- lib/action_cable/connection/subscriptions.rb | 54 ++++++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 37 deletions(-) create mode 100644 lib/action_cable/connection/subscriptions.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 8a695a3d0d..31480f220f 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -3,6 +3,7 @@ module ActionCable autoload :Base, 'action_cable/connection/base' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' + autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ffef85ead2..df07c567fb 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -9,6 +9,8 @@ module ActionCable attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :subscriptions + attr_reader :logger def initialize(server, env) @@ -19,9 +21,9 @@ module ActionCable @accept_messages = false @pending_messages = [] - @subscriptions = {} - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + + @subscriptions = ActionCable::Connection::Subscriptions.new(self) end def process @@ -66,8 +68,8 @@ module ActionCable data = ActiveSupport::JSON.decode data_in_json case data['command'] - when 'subscribe' then subscribe_channel data - when 'unsubscribe' then unsubscribe_channel data + when 'subscribe' then subscriptions.add data + when 'unsubscribe' then subscriptions.remove data when 'message' then process_message data else logger.error "Received unrecognized command in #{data.inspect}" @@ -91,7 +93,7 @@ module ActionCable { identifier: connection_identifier, started_at: @started_at, - subscriptions: @subscriptions.keys + subscriptions: subscriptions.identifiers } end @@ -120,54 +122,24 @@ module ActionCable def on_close server.remove_connection(self) - cleanup_subscriptions + subscriptions.cleanup unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection - end - end - def transmit_ping_timestamp transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) end - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Subscription class not found (#{data.inspect})" - end - rescue Exception => e - logger.error "Could not subscribe to channel (#{data.inspect})" - log_exception(e) - end def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data']) - else - raise "Unable to process message because no subscription was found (#{message.inspect})" - end + subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) rescue Exception => e logger.error "Could not process message (#{message.inspect})" log_exception(e) end - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].perform_disconnection - @subscriptions.delete(data['identifier']) - end def invalid_request logger.info finished_request_message diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb new file mode 100644 index 0000000000..888b93a652 --- /dev/null +++ b/lib/action_cable/connection/subscriptions.rb @@ -0,0 +1,54 @@ +module ActionCable + module Connection + class Subscriptions + def initialize(connection) + @connection = connection + @subscriptions = {} + end + + def add(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = connection.server.registered_channels.detect do |channel_klass| + channel_klass.find_name == id_options[:channel] + end + + if subscription_klass + subscriptions[id_key] = subscription_klass.new(connection, id_key, id_options) + else + connection.logger.error "Subscription class not found (#{data.inspect})" + end + rescue Exception => e + connection.logger.error "Could not subscribe to channel (#{data.inspect}) due to '#{e}': #{e.backtrace.join(' - ')}" + end + + def remove(data) + connection.logger.info "Unsubscribing from channel: #{data['identifier']}" + subscriptions[data['identifier']].perform_disconnection + subscriptions.delete(data['identifier']) + end + + def find(identifier) + if subscription = subscriptions[identifier] + subscription + else + raise "Unable to find subscription with identifier: #{identifier}" + end + end + + def identifiers + subscriptions.keys + end + + def cleanup + subscriptions.each do |id, channel| + channel.perform_disconnection + end + end + + private + attr_reader :connection, :subscriptions + end + end +end \ No newline at end of file -- cgit v1.2.3 From 786bbbb0ee1de0f2c8c9be517b8d5c93f95421d4 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 22:49:29 +0200 Subject: Extract Heartbeat class to perform periodical ping --- lib/action_cable/connection.rb | 1 + lib/action_cable/connection/base.rb | 17 +++++------------ lib/action_cable/connection/heartbeat.rb | 27 +++++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 12 deletions(-) create mode 100644 lib/action_cable/connection/heartbeat.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 31480f220f..991dd85c57 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,6 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' + autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :Subscriptions, 'action_cable/connection/subscriptions' diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index df07c567fb..6fb0a61743 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -4,13 +4,9 @@ module ActionCable include Identification include InternalChannel - PING_INTERVAL = 3 - attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server - attr_reader :subscriptions - attr_reader :logger def initialize(server, env) @@ -23,6 +19,7 @@ module ActionCable @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) end @@ -33,8 +30,7 @@ module ActionCable @websocket = Faye::WebSocket.new(@env) @websocket.on(:open) do |event| - transmit_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp } + heartbeat.start worker_pool.async.invoke(self, :on_open) end @@ -53,8 +49,8 @@ module ActionCable @websocket.on(:close) do |event| logger.info finished_request_message + heartbeat.stop worker_pool.async.invoke(self, :on_close) - EventMachine.cancel_timer(@ping_timer) if @ping_timer end @websocket.rack_response @@ -109,6 +105,8 @@ module ActionCable private + attr_reader :heartbeat, :subscriptions + def on_open server.add_connection(self) @@ -128,11 +126,6 @@ module ActionCable end - def transmit_ping_timestamp - transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - - def process_message(message) subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) rescue Exception => e diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb new file mode 100644 index 0000000000..47cd937c25 --- /dev/null +++ b/lib/action_cable/connection/heartbeat.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + class Heartbeat + BEAT_INTERVAL = 3 + + def initialize(connection) + @connection = connection + end + + def start + beat + @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat } + end + + def stop + EventMachine.cancel_timer(@timer) if @timer + end + + private + attr_reader :connection + + def beat + connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + end + end +end \ No newline at end of file -- cgit v1.2.3 From da098df459d4d60efce418ab79121160eaf45d03 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 22:51:55 +0200 Subject: Centralize logging in process and enhance method name --- lib/action_cable/connection/base.rb | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 6fb0a61743..cac127ab45 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -55,7 +55,9 @@ module ActionCable @websocket.rack_response else - invalid_request + logger.info finished_request_message + + respond_to_invalid_request end end @@ -134,9 +136,8 @@ module ActionCable end - def invalid_request - logger.info finished_request_message - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + def respond_to_invalid_request + [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end def websocket_alive? -- cgit v1.2.3 From 375b315da62d55b47074ab8cdde60eac4dfaef2a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 22:53:20 +0200 Subject: Add logging for when message isn't a string --- lib/action_cable/connection/base.rb | 2 ++ 1 file changed, 2 insertions(+) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index cac127ab45..995b0901ca 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -43,6 +43,8 @@ module ActionCable else @pending_messages << message end + else + logger.error "Couldn't handle non-string message: #{message.class}" end end -- cgit v1.2.3 From e0926038983177f5491e45cc338e5dc091e3a86d Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 23:02:46 +0200 Subject: Wrap message queueing in a more welcoming API --- lib/action_cable/connection/base.rb | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 995b0901ca..71e84aed99 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -14,9 +14,6 @@ module ActionCable @server, @env = server, env - @accept_messages = false - @pending_messages = [] - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) @heartbeat = ActionCable::Connection::Heartbeat.new(self) @@ -38,10 +35,10 @@ module ActionCable message = event.data if message.is_a?(String) - if @accept_messages + if accepting_messages? worker_pool.async.invoke(self, :receive, message) else - @pending_messages << message + queue_message message end else logger.error "Couldn't handle non-string message: #{message.class}" @@ -117,10 +114,29 @@ module ActionCable connect if respond_to?(:connect) subscribe_to_internal_channel + ready_to_accept_messages + process_pending_messages + end + + + def accepting_messages? + @accept_messages + end + + def ready_to_accept_messages @accept_messages = true + end + + def queue_message(message) + @pending_messages ||= [] + @pending_messages << message + end + + def process_pending_messages worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty? end + def on_close server.remove_connection(self) -- cgit v1.2.3 From 8115d25033a0af2d29b57e1e5a6afaa70038a3d4 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 14:15:28 +0200 Subject: WIP: Extract processor --- lib/action_cable/connection/base.rb | 13 ++------ lib/action_cable/connection/processor.rb | 54 ++++++++++++++++++++++++++++++++ 2 files changed, 56 insertions(+), 11 deletions(-) create mode 100644 lib/action_cable/connection/processor.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 71e84aed99..1fdc6f0fe8 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -18,6 +18,7 @@ module ActionCable @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) + @processor = ActionCable::Connection::Processor.new(self) end def process @@ -32,17 +33,7 @@ module ActionCable end @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if accepting_messages? - worker_pool.async.invoke(self, :receive, message) - else - queue_message message - end - else - logger.error "Couldn't handle non-string message: #{message.class}" - end + processor.handle event.data end @websocket.on(:close) do |event| diff --git a/lib/action_cable/connection/processor.rb b/lib/action_cable/connection/processor.rb new file mode 100644 index 0000000000..2060392478 --- /dev/null +++ b/lib/action_cable/connection/processor.rb @@ -0,0 +1,54 @@ +module ActionCable + module Connection + class Processor + def initialize(connection) + @connection = connection + @pending_messages = [] + end + + def handle(message) + if valid? message + if ready? + process message + else + queue message + end + end + end + + def ready? + @ready + end + + def ready! + @ready = true + handle_pending_messages + end + + private + attr_reader :connection + attr_accessor :pending_messages + + def process(message) + connection.worker_pool.async.invoke(connection, :receive, message) + end + + def queue(message) + pending_messages << message + end + + def valid?(message) + if message.is_a?(String) + true + else + connection.logger.error "Couldn't handle non-string message: #{message.class}" + false + end + end + + def handle_pending_messages + process pending_messages.shift until pending_messages.empty? + end + end + end +end \ No newline at end of file -- cgit v1.2.3 From d796d9a61e1208f0706642ff02f7c8236185e55a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 15:47:32 +0200 Subject: Finish Processor class extraction --- lib/action_cable/connection.rb | 1 + lib/action_cable/connection/base.rb | 31 ++++++++----------------------- lib/action_cable/connection/processor.rb | 2 +- 3 files changed, 10 insertions(+), 24 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 991dd85c57..5928a47949 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -4,6 +4,7 @@ module ActionCable autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' + autoload :Processor, 'action_cable/connection/processor' autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 1fdc6f0fe8..c3c99dcec4 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -29,7 +29,7 @@ module ActionCable @websocket.on(:open) do |event| heartbeat.start - worker_pool.async.invoke(self, :on_open) + send_async :on_open end @websocket.on(:message) do |event| @@ -40,7 +40,7 @@ module ActionCable logger.info finished_request_message heartbeat.stop - worker_pool.async.invoke(self, :on_close) + send_async :on_close end @websocket.rack_response @@ -77,6 +77,10 @@ module ActionCable end + def send_async(method, *arguments) + worker_pool.async.invoke(self, method, *arguments) + end + def statistics { identifier: connection_identifier, @@ -97,7 +101,7 @@ module ActionCable private - attr_reader :heartbeat, :subscriptions + attr_reader :heartbeat, :subscriptions, :processor def on_open server.add_connection(self) @@ -105,26 +109,7 @@ module ActionCable connect if respond_to?(:connect) subscribe_to_internal_channel - ready_to_accept_messages - process_pending_messages - end - - - def accepting_messages? - @accept_messages - end - - def ready_to_accept_messages - @accept_messages = true - end - - def queue_message(message) - @pending_messages ||= [] - @pending_messages << message - end - - def process_pending_messages - worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty? + processor.ready! end diff --git a/lib/action_cable/connection/processor.rb b/lib/action_cable/connection/processor.rb index 2060392478..3191be4a4c 100644 --- a/lib/action_cable/connection/processor.rb +++ b/lib/action_cable/connection/processor.rb @@ -30,7 +30,7 @@ module ActionCable attr_accessor :pending_messages def process(message) - connection.worker_pool.async.invoke(connection, :receive, message) + connection.send_async :receive, message end def queue(message) -- cgit v1.2.3 From 09974941ccc7f782d163197d1a96440fcc811e85 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:01:41 +0200 Subject: Extract helper method --- lib/action_cable/connection/base.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index c3c99dcec4..4bc6a14aaa 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -53,7 +53,7 @@ module ActionCable def receive(data_in_json) if websocket_alive? - data = ActiveSupport::JSON.decode data_in_json + data = decode_json data_in_json case data['command'] when 'subscribe' then subscriptions.add data @@ -123,13 +123,17 @@ module ActionCable def process_message(message) - subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) + subscriptions.find(message['identifier']).perform_action decode_json(message['data']) rescue Exception => e logger.error "Could not process message (#{message.inspect})" log_exception(e) end + def decode_json(json) + ActiveSupport::JSON.decode json + end + def respond_to_invalid_request [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end -- cgit v1.2.3 From 24609f18f54938988035a97eb09ccfe309cf8710 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:03:34 +0200 Subject: Rename Processor to MessageBuffer --- lib/action_cable/connection.rb | 2 +- lib/action_cable/connection/base.rb | 12 +++--- lib/action_cable/connection/message_buffer.rb | 51 +++++++++++++++++++++++++ lib/action_cable/connection/processor.rb | 54 --------------------------- 4 files changed, 58 insertions(+), 61 deletions(-) create mode 100644 lib/action_cable/connection/message_buffer.rb delete mode 100644 lib/action_cable/connection/processor.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 5928a47949..09ef1699a6 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -4,7 +4,7 @@ module ActionCable autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' - autoload :Processor, 'action_cable/connection/processor' + autoload :MessageBuffer, 'action_cable/connection/message_buffer' autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 4bc6a14aaa..da1fe380e2 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -16,9 +16,9 @@ module ActionCable @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) - @heartbeat = ActionCable::Connection::Heartbeat.new(self) - @subscriptions = ActionCable::Connection::Subscriptions.new(self) - @processor = ActionCable::Connection::Processor.new(self) + @heartbeat = ActionCable::Connection::Heartbeat.new(self) + @subscriptions = ActionCable::Connection::Subscriptions.new(self) + @message_buffer = ActionCable::Connection::MessageBuffer.new(self) end def process @@ -33,7 +33,7 @@ module ActionCable end @websocket.on(:message) do |event| - processor.handle event.data + message_buffer.append event.data end @websocket.on(:close) do |event| @@ -101,7 +101,7 @@ module ActionCable private - attr_reader :heartbeat, :subscriptions, :processor + attr_reader :heartbeat, :subscriptions, :message_buffer def on_open server.add_connection(self) @@ -109,7 +109,7 @@ module ActionCable connect if respond_to?(:connect) subscribe_to_internal_channel - processor.ready! + message_buffer.process! end diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb new file mode 100644 index 0000000000..615266e0cb --- /dev/null +++ b/lib/action_cable/connection/message_buffer.rb @@ -0,0 +1,51 @@ +module ActionCable + module Connection + class MessageBuffer + def initialize(connection) + @connection = connection + @buffered_messages = [] + end + + def append(message) + if valid? message + if processing? + receive message + else + buffer message + end + else + connection.logger.error "Couldn't handle non-string message: #{message.class}" + end + end + + def processing? + @processing + end + + def process! + @processing = true + receive_buffered_messages + end + + private + attr_reader :connection + attr_accessor :buffered_messages + + def valid?(message) + message.is_a?(String) + end + + def receive(message) + connection.send_async :receive, message + end + + def buffer(message) + buffered_messages << message + end + + def receive_buffered_messages + receive buffered_messages.shift until buffered_messages.empty? + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/connection/processor.rb b/lib/action_cable/connection/processor.rb deleted file mode 100644 index 3191be4a4c..0000000000 --- a/lib/action_cable/connection/processor.rb +++ /dev/null @@ -1,54 +0,0 @@ -module ActionCable - module Connection - class Processor - def initialize(connection) - @connection = connection - @pending_messages = [] - end - - def handle(message) - if valid? message - if ready? - process message - else - queue message - end - end - end - - def ready? - @ready - end - - def ready! - @ready = true - handle_pending_messages - end - - private - attr_reader :connection - attr_accessor :pending_messages - - def process(message) - connection.send_async :receive, message - end - - def queue(message) - pending_messages << message - end - - def valid?(message) - if message.is_a?(String) - true - else - connection.logger.error "Couldn't handle non-string message: #{message.class}" - false - end - end - - def handle_pending_messages - process pending_messages.shift until pending_messages.empty? - end - end - end -end \ No newline at end of file -- cgit v1.2.3 From aaad3ea707a7ed28bbf4591f1b7b1bdde62714c4 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:09:37 +0200 Subject: Slim down the web socket respond blocks Move heartbeat into on_open/close and add a similarly named on_message to handle that callback. --- lib/action_cable/connection/base.rb | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index da1fe380e2..e97d40c941 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -27,26 +27,12 @@ module ActionCable if websocket? @websocket = Faye::WebSocket.new(@env) - @websocket.on(:open) do |event| - heartbeat.start - send_async :on_open - end - - @websocket.on(:message) do |event| - message_buffer.append event.data - end - - @websocket.on(:close) do |event| - logger.info finished_request_message - - heartbeat.stop - send_async :on_close - end + @websocket.on(:open) { |event| send_async :on_open } + @websocket.on(:message) { |event| on_message event.data } + @websocket.on(:close) { |event| send_async :on_close } @websocket.rack_response else - logger.info finished_request_message - respond_to_invalid_request end end @@ -108,16 +94,24 @@ module ActionCable connect if respond_to?(:connect) subscribe_to_internal_channel + heartbeat.start message_buffer.process! end + def on_message(message) + message_buffer.append event.data + end def on_close + logger.info finished_request_message + server.remove_connection(self) subscriptions.cleanup unsubscribe_from_internal_channel + heartbeat.stop + disconnect if respond_to?(:disconnect) end @@ -135,6 +129,7 @@ module ActionCable end def respond_to_invalid_request + logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end -- cgit v1.2.3 From a80c8c0e3943001e039e49f9aa91f55eeeb65f5a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:11:15 +0200 Subject: Fix reference --- lib/action_cable/connection/base.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index e97d40c941..1548447d74 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -100,7 +100,7 @@ module ActionCable end def on_message(message) - message_buffer.append event.data + message_buffer.append message end def on_close -- cgit v1.2.3 From 05c3ba113c752c1aebc09260bd0ce36f9e3b722b Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:13:09 +0200 Subject: Use private accessor --- lib/action_cable/connection/base.rb | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 1548447d74..69102aeaa3 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -27,11 +27,11 @@ module ActionCable if websocket? @websocket = Faye::WebSocket.new(@env) - @websocket.on(:open) { |event| send_async :on_open } - @websocket.on(:message) { |event| on_message event.data } - @websocket.on(:close) { |event| send_async :on_close } - - @websocket.rack_response + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + + websocket.rack_response else respond_to_invalid_request end @@ -54,12 +54,12 @@ module ActionCable end def transmit(data) - @websocket.send data + websocket.send data end def close logger.error "Closing connection" - @websocket.close + websocket.close end @@ -87,6 +87,7 @@ module ActionCable private + attr_reader :websocket attr_reader :heartbeat, :subscriptions, :message_buffer def on_open @@ -134,7 +135,7 @@ module ActionCable end def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end def websocket? -- cgit v1.2.3 From a7607928e341eea7740b27f3ae507c26c7a68c56 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:19:19 +0200 Subject: Style --- lib/action_cable/connection/base.rb | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 69102aeaa3..7db554294a 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -68,11 +68,7 @@ module ActionCable end def statistics - { - identifier: connection_identifier, - started_at: @started_at, - subscriptions: subscriptions.identifiers - } + { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers } end -- cgit v1.2.3 From 82f13443a508600a94319ce0e636d04f0ed4673e Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:19:30 +0200 Subject: Spacing --- lib/action_cable/connection/base.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 7db554294a..5951198f36 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -120,16 +120,17 @@ module ActionCable log_exception(e) end - def decode_json(json) ActiveSupport::JSON.decode json end + def respond_to_invalid_request logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end + def websocket_alive? websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end @@ -138,6 +139,7 @@ module ActionCable @is_websocket ||= Faye::WebSocket.websocket?(@env) end + def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, @@ -155,6 +157,7 @@ module ActionCable Time.now.to_default_s ] end + def log_exception(e) logger.error "There was an exception: #{e.class} - #{e.message}" logger.error e.backtrace.join("\n") -- cgit v1.2.3 From 72c16340bff9a79eecc2dd5e9291b199f5ae32ea Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:20:06 +0200 Subject: Extract execute_command method and centralize exception handling --- lib/action_cable/connection/base.rb | 22 ++++++++++------------ lib/action_cable/connection/subscriptions.rb | 2 -- 2 files changed, 10 insertions(+), 14 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 5951198f36..aa3eb6472d 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -39,15 +39,7 @@ module ActionCable def receive(data_in_json) if websocket_alive? - data = decode_json data_in_json - - case data['command'] - when 'subscribe' then subscriptions.add data - when 'unsubscribe' then subscriptions.remove data - when 'message' then process_message data - else - logger.error "Received unrecognized command in #{data.inspect}" - end + execute_command decode_json(data_in_json) else logger.error "Received data without a live websocket (#{data.inspect})" end @@ -113,10 +105,16 @@ module ActionCable end - def process_message(message) - subscriptions.find(message['identifier']).perform_action decode_json(message['data']) + def execute_command(data) + case data['command'] + when 'subscribe' then subscriptions.add data + when 'unsubscribe' then subscriptions.remove data + when 'message' then subscriptions.find(message['identifier']).perform_action decode_json(message['data']) + else + logger.error "Received unrecognized command in #{data.inspect}" + end rescue Exception => e - logger.error "Could not process message (#{message.inspect})" + logger.error "Could not execute command from #{data.inspect})" log_exception(e) end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 888b93a652..dbba8eca1d 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -19,8 +19,6 @@ module ActionCable else connection.logger.error "Subscription class not found (#{data.inspect})" end - rescue Exception => e - connection.logger.error "Could not subscribe to channel (#{data.inspect}) due to '#{e}': #{e.backtrace.join(' - ')}" end def remove(data) -- cgit v1.2.3 From 71ebc3aca6be37faf7bdd775667e23b9d759e4a6 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:25:23 +0200 Subject: Style --- lib/action_cable/connection/subscriptions.rb | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index dbba8eca1d..a37708d85f 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -40,9 +40,7 @@ module ActionCable end def cleanup - subscriptions.each do |id, channel| - channel.perform_disconnection - end + subscriptions.each { |id, channel| channel.perform_disconnection } end private -- cgit v1.2.3 From 04aed03c896f661143bce1e4b879cff480963fe6 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:25:33 +0200 Subject: Use delegated logger --- lib/action_cable/connection/subscriptions.rb | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index a37708d85f..d6525b61c3 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -17,12 +17,12 @@ module ActionCable if subscription_klass subscriptions[id_key] = subscription_klass.new(connection, id_key, id_options) else - connection.logger.error "Subscription class not found (#{data.inspect})" + logger.error "Subscription class not found (#{data.inspect})" end end def remove(data) - connection.logger.info "Unsubscribing from channel: #{data['identifier']}" + logger.info "Unsubscribing from channel: #{data['identifier']}" subscriptions[data['identifier']].perform_disconnection subscriptions.delete(data['identifier']) end @@ -45,6 +45,7 @@ module ActionCable private attr_reader :connection, :subscriptions + delegate :logger, to: :connection end end end \ No newline at end of file -- cgit v1.2.3 From 82f1e19674b290fcee32a048707055e9b82aa310 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:28:53 +0200 Subject: Feature envy detected, so move execute_command to Subscriptions --- lib/action_cable/connection/base.rb | 23 ++------------------- lib/action_cable/connection/subscriptions.rb | 31 ++++++++++++++++++++++------ 2 files changed, 27 insertions(+), 27 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index aa3eb6472d..7c79abcb89 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -39,7 +39,7 @@ module ActionCable def receive(data_in_json) if websocket_alive? - execute_command decode_json(data_in_json) + subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else logger.error "Received data without a live websocket (#{data.inspect})" end @@ -105,24 +105,6 @@ module ActionCable end - def execute_command(data) - case data['command'] - when 'subscribe' then subscriptions.add data - when 'unsubscribe' then subscriptions.remove data - when 'message' then subscriptions.find(message['identifier']).perform_action decode_json(message['data']) - else - logger.error "Received unrecognized command in #{data.inspect}" - end - rescue Exception => e - logger.error "Could not execute command from #{data.inspect})" - log_exception(e) - end - - def decode_json(json) - ActiveSupport::JSON.decode json - end - - def respond_to_invalid_request logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] @@ -157,8 +139,7 @@ module ActionCable def log_exception(e) - logger.error "There was an exception: #{e.class} - #{e.message}" - logger.error e.backtrace.join("\n") + logger.error "Exception raised #{e.class} - #{e.message}: #{e.backtrace.first(5).join(" | ")}" end def log_tags diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index d6525b61c3..e0a3a133c5 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -6,6 +6,19 @@ module ActionCable @subscriptions = {} end + def execute_command(data) + case data['command'] + when 'subscribe' then add data + when 'unsubscribe' then remove data + when 'message' then perform_action data + else + logger.error "Received unrecognized command in #{data.inspect}" + end + rescue Exception => e + logger.error "Could not execute command from #{data.inspect})" + connection.log_exception(e) + end + def add(data) id_key = data['identifier'] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access @@ -27,14 +40,11 @@ module ActionCable subscriptions.delete(data['identifier']) end - def find(identifier) - if subscription = subscriptions[identifier] - subscription - else - raise "Unable to find subscription with identifier: #{identifier}" - end + def perform_action(data) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) end + def identifiers subscriptions.keys end @@ -43,9 +53,18 @@ module ActionCable subscriptions.each { |id, channel| channel.perform_disconnection } end + private attr_reader :connection, :subscriptions delegate :logger, to: :connection + + def find(data) + if subscription = subscriptions[data['identifier']] + subscription + else + raise "Unable to find subscription with identifier: #{identifier}" + end + end end end end \ No newline at end of file -- cgit v1.2.3 From f91e39429a4f08e4d78196ddcb12dc2930d07d92 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:52:41 +0200 Subject: Clarify what websocket thing we're talking about --- lib/action_cable/connection/base.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 7c79abcb89..0fb98d7293 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -24,8 +24,8 @@ module ActionCable def process logger.info started_request_message - if websocket? @websocket = Faye::WebSocket.new(@env) + if websocket_request? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } @@ -115,8 +115,8 @@ module ActionCable websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end - def websocket? - @is_websocket ||= Faye::WebSocket.websocket?(@env) + def websocket_request? + @is_websocket ||= Faye::WebSocket.websocket_request?(@env) end @@ -124,7 +124,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket_request? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end @@ -132,7 +132,7 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket? ? ' [Websocket]' : '', + websocket_request? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end -- cgit v1.2.3 From 6726c11cda7a970ca02fd690ea7b5063fcfba7bc Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:53:00 +0200 Subject: Composed method to same order of abstraction --- lib/action_cable/connection/base.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 0fb98d7293..252b71e847 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -24,8 +24,8 @@ module ActionCable def process logger.info started_request_message - @websocket = Faye::WebSocket.new(@env) if websocket_request? + websocket_initialization websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } @@ -111,6 +111,10 @@ module ActionCable end + def websocket_initialization + @websocket = Faye::WebSocket.new(@env) + end + def websocket_alive? websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end -- cgit v1.2.3 From b9fcaa7cbcad9a4ae2e56e1907764b6eae4a94c6 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 16:56:58 +0200 Subject: Fix method --- lib/action_cable/connection/base.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 252b71e847..fe5f058824 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -120,7 +120,7 @@ module ActionCable end def websocket_request? - @is_websocket ||= Faye::WebSocket.websocket_request?(@env) + @is_websocket ||= Faye::WebSocket.websocket?(@env) end -- cgit v1.2.3 From a66c56210c844ba51452fbf7a0aa01175ea3eb6f Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 21:34:06 +0200 Subject: Fix RemoteConnection due to refactoring breakage --- lib/action_cable/remote_connection.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb index 912fb6eb57..b6fdf226e3 100644 --- a/lib/action_cable/remote_connection.rb +++ b/lib/action_cable/remote_connection.rb @@ -2,7 +2,7 @@ module ActionCable class RemoteConnection class InvalidIdentifiersError < StandardError; end - include Connection::Identifier + include Connection::Identification, Connection::InternalChannel def initialize(server, ids) @server = server @@ -10,8 +10,7 @@ module ActionCable end def disconnect - message = { type: 'disconnect' }.to_json - redis.publish(internal_redis_channel, message) + redis.publish internal_redis_channel, { type: 'disconnect' }.to_json end def identifiers -- cgit v1.2.3 From 268ee5208ce513eb0b74e2354259e7991d1633c9 Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Wed, 24 Jun 2015 14:26:26 -0400 Subject: Create JavaScript channels identified by their Ruby class name --- lib/action_cable/connection/subscriptions.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index e0a3a133c5..992def173e 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -24,7 +24,7 @@ module ActionCable id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.registered_channels.detect do |channel_klass| - channel_klass.find_name == id_options[:channel] + channel_klass == id_options[:channel].safe_constantize end if subscription_klass @@ -67,4 +67,4 @@ module ActionCable end end end -end \ No newline at end of file +end -- cgit v1.2.3 From 88585965ec00bbe9fe41bbe468bfbbf6dc0f9d89 Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Wed, 24 Jun 2015 14:40:51 -0400 Subject: Remove now unused channel_name --- lib/action_cable/channel/base.rb | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 83ba2cb3d2..6c55a8ed65 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -10,16 +10,10 @@ module ActionCable attr_reader :params, :connection delegate :logger, to: :connection - class_attribute :channel_name - class << self def matches?(identifier) raise "Please implement #{name}#matches? method" end - - def find_name - @name ||= channel_name || to_s.demodulize.underscore - end end def initialize(connection, channel_identifier, params = {}) @@ -138,4 +132,4 @@ module ActionCable end end end -end \ No newline at end of file +end -- cgit v1.2.3 From 85272d8a91aa2a08a4f83100e24cb1b0c8c9ccf3 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Fri, 26 Jun 2015 19:11:21 +0200 Subject: Don't need a log_exception helper, just do it inline --- lib/action_cable/connection/base.rb | 4 ---- lib/action_cable/connection/subscriptions.rb | 3 +-- 2 files changed, 1 insertion(+), 6 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index fe5f058824..aac23b2596 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -142,10 +142,6 @@ module ActionCable end - def log_exception(e) - logger.error "Exception raised #{e.class} - #{e.message}: #{e.backtrace.first(5).join(" | ")}" - end - def log_tags server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index e0a3a133c5..9e7a8a5f73 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -15,8 +15,7 @@ module ActionCable logger.error "Received unrecognized command in #{data.inspect}" end rescue Exception => e - logger.error "Could not execute command from #{data.inspect})" - connection.log_exception(e) + logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}" end def add(data) -- cgit v1.2.3 From e3cb3696cfaa766b62d644411fe71e4e64aab85a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Fri, 26 Jun 2015 19:11:31 +0200 Subject: TOC refactor --- lib/action_cable/connection/base.rb | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index aac23b2596..4af651a5d9 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -78,6 +78,20 @@ module ActionCable attr_reader :websocket attr_reader :heartbeat, :subscriptions, :message_buffer + + def websocket_initialization + @websocket = Faye::WebSocket.new(@env) + end + + def websocket_alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + end + + def websocket_request? + @is_websocket ||= Faye::WebSocket.websocket?(@env) + end + + def on_open server.add_connection(self) @@ -111,19 +125,6 @@ module ActionCable end - def websocket_initialization - @websocket = Faye::WebSocket.new(@env) - end - - def websocket_alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN - end - - def websocket_request? - @is_websocket ||= Faye::WebSocket.websocket?(@env) - end - - def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, -- cgit v1.2.3 From 4cef27aacffb2ce5aada0c7199e9eb8787291baf Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 15:59:12 +0200 Subject: Explain the purpose --- lib/action_cable/connection/tagged_logger_proxy.rb | 3 +++ 1 file changed, 3 insertions(+) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb index e9e12e2672..e0c0075adf 100644 --- a/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/lib/action_cable/connection/tagged_logger_proxy.rb @@ -1,5 +1,8 @@ module ActionCable module Connection + # Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional + # ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests. + # The connection is long-lived, so it needs its own set of tags for its independent duration. class TaggedLoggerProxy def initialize(logger, tags:) @logger = logger -- cgit v1.2.3 From a2d55dfdc3878521793a8472c00b7a648ff21ae3 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:18:26 +0200 Subject: Use an encapsulated factory method --- lib/action_cable/connection/base.rb | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 4af651a5d9..a231288b4b 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -14,7 +14,7 @@ module ActionCable @server, @env = server, env - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + @logger = initialize_tagged_logger @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @@ -143,8 +143,10 @@ module ActionCable end - def log_tags - server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } + # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. + def initialize_tagged_logger + TaggedLoggerProxy.new server.logger, + tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end end end -- cgit v1.2.3 From 3dd19d9d3c4bf678d45230485403e7460e75373f Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:18:38 +0200 Subject: Better order --- lib/action_cable/connection/base.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index a231288b4b..ba1a486afb 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -10,8 +10,6 @@ module ActionCable attr_reader :logger def initialize(server, env) - @started_at = Time.now - @server, @env = server, env @logger = initialize_tagged_logger @@ -19,6 +17,8 @@ module ActionCable @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) + + @started_at = Time.now end def process -- cgit v1.2.3 From 78f3c88d69741ffd9b24da8d362f3f7c4c8454f8 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:27:08 +0200 Subject: Better ordering --- lib/action_cable/connection/base.rb | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ba1a486afb..ae9dd58ab4 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -78,7 +78,6 @@ module ActionCable attr_reader :websocket attr_reader :heartbeat, :subscriptions, :message_buffer - def websocket_initialization @websocket = Faye::WebSocket.new(@env) end @@ -125,6 +124,12 @@ module ActionCable end + # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. + def initialize_tagged_logger + TaggedLoggerProxy.new server.logger, + tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } + end + def started_request_message 'Started %s "%s"%s for %s at %s' % [ request.request_method, @@ -141,13 +146,6 @@ module ActionCable request.ip, Time.now.to_default_s ] end - - - # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. - def initialize_tagged_logger - TaggedLoggerProxy.new server.logger, - tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } - end end end end -- cgit v1.2.3 From 321d04ff56e2f17ef7285141252dba8ff5cdecca Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:50:05 +0200 Subject: Add WebSocket decorator --- lib/action_cable/connection.rb | 1 + lib/action_cable/connection/base.rb | 28 +++++++--------------------- lib/action_cable/connection/web_socket.rb | 27 +++++++++++++++++++++++++++ lib/action_cable/server.rb | 2 +- 4 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 lib/action_cable/connection/web_socket.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 09ef1699a6..1b4a6ecc23 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -5,6 +5,7 @@ module ActionCable autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' autoload :MessageBuffer, 'action_cable/connection/message_buffer' + autoload :WebSocket, 'action_cable/connection/web_socket' autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ae9dd58ab4..efabe40b73 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -14,6 +14,7 @@ module ActionCable @logger = initialize_tagged_logger + @websocket = ActionCable::Connection::WebSocket.new(env) @heartbeat = ActionCable::Connection::Heartbeat.new(self) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -21,12 +22,10 @@ module ActionCable @started_at = Time.now end - def process + def response logger.info started_request_message - if websocket_request? - websocket_initialization - + if websocket.possible? websocket.on(:open) { |event| send_async :on_open } websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } @@ -38,7 +37,7 @@ module ActionCable end def receive(data_in_json) - if websocket_alive? + if websocket.alive? subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) else logger.error "Received data without a live websocket (#{data.inspect})" @@ -46,7 +45,7 @@ module ActionCable end def transmit(data) - websocket.send data + websocket.transmit data end def close @@ -78,19 +77,6 @@ module ActionCable attr_reader :websocket attr_reader :heartbeat, :subscriptions, :message_buffer - def websocket_initialization - @websocket = Faye::WebSocket.new(@env) - end - - def websocket_alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN - end - - def websocket_request? - @is_websocket ||= Faye::WebSocket.websocket?(@env) - end - - def on_open server.add_connection(self) @@ -134,7 +120,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket_request? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end @@ -142,7 +128,7 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket_request? ? ' [Websocket]' : '', + websocket.possible? ? ' [Websocket]' : '', request.ip, Time.now.to_default_s ] end diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb new file mode 100644 index 0000000000..135a28cfe4 --- /dev/null +++ b/lib/action_cable/connection/web_socket.rb @@ -0,0 +1,27 @@ +module ActionCable + module Connection + # Decorate the Faye::WebSocket with helpers we need. + class WebSocket + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + end + + def possible? + websocket + end + + def alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + websocket.send data + end + + private + attr_reader :websocket + end + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 3a16f51757..dbfadaa34c 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -17,7 +17,7 @@ module ActionCable end def call(env) - @connection_class.new(self, env).process + @connection_class.new(self, env).response end def worker_pool -- cgit v1.2.3 From 3c333f1a22c1b4f0ae42161df1ce9b4c4730999d Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:54:08 +0200 Subject: Change back, more is happening than just response --- lib/action_cable/connection/base.rb | 2 +- lib/action_cable/server.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index efabe40b73..e5d63abe5b 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -22,7 +22,7 @@ module ActionCable @started_at = Time.now end - def response + def process logger.info started_request_message if websocket.possible? diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index dbfadaa34c..3a16f51757 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -17,7 +17,7 @@ module ActionCable end def call(env) - @connection_class.new(self, env).response + @connection_class.new(self, env).process end def worker_pool -- cgit v1.2.3 From 98c1ce0aec6d996836a1c38dc8ebd1caeb49240d Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:54:20 +0200 Subject: Composed method on the response --- lib/action_cable/connection/base.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index e5d63abe5b..2da1b74c76 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -30,7 +30,7 @@ module ActionCable websocket.on(:message) { |event| on_message event.data } websocket.on(:close) { |event| send_async :on_close } - websocket.rack_response + respond_to_successful_request else respond_to_invalid_request end @@ -104,6 +104,10 @@ module ActionCable end + def respond_to_successful_request + websocket.rack_response + end + def respond_to_invalid_request logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] -- cgit v1.2.3 From 1e4b1ca1bc9769c50f5b3716678bc580562333e1 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:54:58 +0200 Subject: initialize -> new --- lib/action_cable/connection/base.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 2da1b74c76..69c0db9167 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -12,7 +12,7 @@ module ActionCable def initialize(server, env) @server, @env = server, env - @logger = initialize_tagged_logger + @logger = new_tagged_logger @websocket = ActionCable::Connection::WebSocket.new(env) @heartbeat = ActionCable::Connection::Heartbeat.new(self) @@ -115,7 +115,7 @@ module ActionCable # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags. - def initialize_tagged_logger + def new_tagged_logger TaggedLoggerProxy.new server.logger, tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize } end -- cgit v1.2.3 From 16849a7e68ed7bd50e47bb429a30d6dcedf1979b Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sat, 27 Jun 2015 16:57:32 +0200 Subject: Use accessor --- lib/action_cable/connection/internal_channel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb index 885457d8c2..70e5e58373 100644 --- a/lib/action_cable/connection/internal_channel.rb +++ b/lib/action_cable/connection/internal_channel.rb @@ -31,7 +31,7 @@ module ActionCable case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" - @websocket.close + websocket.close end rescue Exception => e logger.error "There was an exception - #{e.class}(#{e.message})" -- cgit v1.2.3 From f61467ec5b90ebb75987a13f763b6a19548d84b3 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:16:54 +0200 Subject: Move server classes to its own namespace --- lib/action_cable/server.rb | 75 ++------------------------------------ lib/action_cable/server/base.rb | 77 +++++++++++++++++++++++++++++++++++++++ lib/action_cable/server/worker.rb | 32 ++++++++++++++++ lib/action_cable/worker.rb | 30 --------------- 4 files changed, 112 insertions(+), 102 deletions(-) create mode 100644 lib/action_cable/server/base.rb create mode 100644 lib/action_cable/server/worker.rb delete mode 100644 lib/action_cable/worker.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 3a16f51757..e17cf872e0 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,75 +1,6 @@ module ActionCable - class Server - cattr_accessor(:logger, instance_reader: true) { Rails.logger } - - attr_accessor :registered_channels, :redis_config, :log_tags - - def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) - @redis_config = redis_config.with_indifferent_access - @registered_channels = Set.new(channels) - @worker_pool_size = worker_pool_size - @connection_class = connection - @log_tags = log_tags - - @connections = [] - - logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" - end - - def call(env) - @connection_class.new(self, env).process - end - - def worker_pool - @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size) - end - - def pubsub - @pubsub ||= redis.pubsub - end - - def redis - @redis ||= begin - redis = EM::Hiredis.connect(@redis_config[:url]) - redis.on(:reconnect_failed) do - logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close - end - redis - end - end - - def threaded_redis - @threaded_redis ||= Redis.new(redis_config) - end - - def remote_connections - @remote_connections ||= RemoteConnections.new(self) - end - - def broadcaster_for(channel) - Broadcaster.new(self, channel) - end - - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end - - def connection_identifiers - @connection_class.identifiers - end - - def add_connection(connection) - @connections << connection - end - - def remove_connection(connection) - @connections.delete connection - end - - def open_connections_statistics - @connections.map(&:statistics) - end + module Server + autoload :Base, 'action_cable/server/base' + autoload :Worker, 'action_cable/server/worker' end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb new file mode 100644 index 0000000000..6abec92dc1 --- /dev/null +++ b/lib/action_cable/server/base.rb @@ -0,0 +1,77 @@ +module ActionCable + module Server + class Base + cattr_accessor(:logger, instance_reader: true) { Rails.logger } + + attr_accessor :registered_channels, :redis_config, :log_tags + + def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) + @redis_config = redis_config.with_indifferent_access + @registered_channels = Set.new(channels) + @worker_pool_size = worker_pool_size + @connection_class = connection + @log_tags = log_tags + + @connections = [] + + logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" + end + + def call(env) + @connection_class.new(self, env).process + end + + def worker_pool + @worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size) + end + + def pubsub + @pubsub ||= redis.pubsub + end + + def redis + @redis ||= begin + redis = EM::Hiredis.connect(@redis_config[:url]) + redis.on(:reconnect_failed) do + logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + redis + end + end + + def threaded_redis + @threaded_redis ||= Redis.new(redis_config) + end + + def remote_connections + @remote_connections ||= RemoteConnections.new(self) + end + + def broadcaster_for(channel) + Broadcaster.new(self, channel) + end + + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + + def connection_identifiers + @connection_class.identifiers + end + + def add_connection(connection) + @connections << connection + end + + def remove_connection(connection) + @connections.delete connection + end + + def open_connections_statistics + @connections.map(&:statistics) + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..0491cb9ab0 --- /dev/null +++ b/lib/action_cable/server/worker.rb @@ -0,0 +1,32 @@ +module ActionCable + module Server + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def invoke(receiver, method, *args) + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + + def run_periodic_timer(channel, callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + end + + private + def logger + ActionCable::Server::Base.logger + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb deleted file mode 100644 index 6800a75d1d..0000000000 --- a/lib/action_cable/worker.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - class Worker - include ActiveSupport::Callbacks - include Celluloid - - define_callbacks :work - - def invoke(receiver, method, *args) - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") - - receiver.handle_exception if receiver.respond_to?(:handle_exception) - end - - def run_periodic_timer(channel, callback) - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - - private - def logger - ActionCable::Server.logger - end - end -end -- cgit v1.2.3 From e1a99a83ca135523ff8513be756f156500999cb8 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:24:50 +0200 Subject: Make broadcasting a concern --- lib/action_cable/broadcaster.rb | 17 ----------------- lib/action_cable/server.rb | 1 + lib/action_cable/server/base.rb | 10 ++-------- lib/action_cable/server/broadcasting.rb | 28 ++++++++++++++++++++++++++++ 4 files changed, 31 insertions(+), 25 deletions(-) delete mode 100644 lib/action_cable/broadcaster.rb create mode 100644 lib/action_cable/server/broadcasting.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/broadcaster.rb b/lib/action_cable/broadcaster.rb deleted file mode 100644 index 7d8cc90970..0000000000 --- a/lib/action_cable/broadcaster.rb +++ /dev/null @@ -1,17 +0,0 @@ -module ActionCable - class Broadcaster - attr_reader :server, :channel, :redis - delegate :logger, to: :server - - def initialize(server, channel) - @server = server - @channel = channel - @redis = @server.threaded_redis - end - - def broadcast(message) - logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" - redis.publish channel, message.to_json - end - end -end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index e17cf872e0..fa7bad4e32 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,6 +1,7 @@ module ActionCable module Server autoload :Base, 'action_cable/server/base' + autoload :Broadcasting, 'action_cable/server/broadcasting' autoload :Worker, 'action_cable/server/worker' end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 6abec92dc1..6dcd282e4a 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -1,6 +1,8 @@ module ActionCable module Server class Base + include ActionCable::Server::Broadcasting + cattr_accessor(:logger, instance_reader: true) { Rails.logger } attr_accessor :registered_channels, :redis_config, :log_tags @@ -49,14 +51,6 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end - def broadcaster_for(channel) - Broadcaster.new(self, channel) - end - - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end - def connection_identifiers @connection_class.identifiers end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb new file mode 100644 index 0000000000..682064571f --- /dev/null +++ b/lib/action_cable/server/broadcasting.rb @@ -0,0 +1,28 @@ +module ActionCable + module Server + module Broadcasting + def broadcaster_for(channel) + Broadcaster.new(self, channel) + end + + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + + class Broadcaster + attr_reader :server, :channel, :redis + delegate :logger, to: :server + + def initialize(server, channel) + @server, @channel = server, channel + @redis = @server.threaded_redis + end + + def broadcast(message) + logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + redis.publish channel, message.to_json + end + end + end + end +end \ No newline at end of file -- cgit v1.2.3 From 8a2af53c8e83cd9258380fad4007e53f8721aa93 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:36:10 +0200 Subject: More redis used for broadcasting into broadcasting concern --- lib/action_cable/server/base.rb | 4 ---- lib/action_cable/server/broadcasting.rb | 32 ++++++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb index 6dcd282e4a..e8109b325d 100644 --- a/lib/action_cable/server/base.rb +++ b/lib/action_cable/server/base.rb @@ -43,10 +43,6 @@ module ActionCable end end - def threaded_redis - @threaded_redis ||= Redis.new(redis_config) - end - def remote_connections @remote_connections ||= RemoteConnections.new(self) end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 682064571f..691ec1b486 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,28 +1,32 @@ module ActionCable module Server module Broadcasting + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + def broadcaster_for(channel) Broadcaster.new(self, channel) end - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end + private + def redis_for_threads + @redis_for_threads ||= Redis.new(redis_config) + end - class Broadcaster - attr_reader :server, :channel, :redis - delegate :logger, to: :server + class Broadcaster + def initialize(server, channel) + @server, @channel = server, channel + end - def initialize(server, channel) - @server, @channel = server, channel - @redis = @server.threaded_redis - end + def broadcast(message, log: true) + server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" if log + server.redis_for_threads.publish channel, message.to_json + end - def broadcast(message) - logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" - redis.publish channel, message.to_json + private + attr_reader :server, :channel end - end end end end \ No newline at end of file -- cgit v1.2.3 From a5d6bc0eb527f6cfa61300e70fa9010544240cf9 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:38:05 +0200 Subject: Make the remote connection use the broadcaster as well --- lib/action_cable/remote_connection.rb | 10 ++++------ lib/action_cable/server/broadcasting.rb | 8 ++++++-- 2 files changed, 10 insertions(+), 8 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb index b6fdf226e3..d7a3f0125d 100644 --- a/lib/action_cable/remote_connection.rb +++ b/lib/action_cable/remote_connection.rb @@ -10,18 +10,16 @@ module ActionCable end def disconnect - redis.publish internal_redis_channel, { type: 'disconnect' }.to_json + server.broadcast_without_logging internal_redis_channel, type: 'disconnect' end def identifiers - @server.connection_identifiers - end - - def redis - @server.threaded_redis + server.connection_identifiers end private + attr_reader :server + def set_identifier_instance_vars(ids) raise InvalidIdentifiersError unless valid_identifiers?(ids) ids.each { |k,v| instance_variable_set("@#{k}", v) } diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 691ec1b486..0d591d03e4 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -19,8 +19,12 @@ module ActionCable @server, @channel = server, channel end - def broadcast(message, log: true) - server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" if log + def broadcast(message) + server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + broadcast_without_logging(message) + end + + def broadcast_without_logging(message) server.redis_for_threads.publish channel, message.to_json end -- cgit v1.2.3 From 3e693e19c4ed3ad3fdb861d0c0d4c3abe118479c Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:42:36 +0200 Subject: Fix reference --- lib/action_cable/connection/subscriptions.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index ae191c7795..2672279828 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -61,7 +61,7 @@ module ActionCable if subscription = subscriptions[data['identifier']] subscription else - raise "Unable to find subscription with identifier: #{identifier}" + raise "Unable to find subscription with identifier: #{data['identifier']}" end end end -- cgit v1.2.3 From c2e2a94306e6b77b0a1dce9b453fbaa04a7f7446 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 20:42:49 +0200 Subject: Rejig for what's used --- lib/action_cable/server/broadcasting.rb | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 0d591d03e4..3fbaa05039 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -9,12 +9,14 @@ module ActionCable Broadcaster.new(self, channel) end - private - def redis_for_threads - @redis_for_threads ||= Redis.new(redis_config) - end + def broadcasting_redis + @broadcasting_redis ||= Redis.new(redis_config) + end + private class Broadcaster + attr_reader :server, :channel + def initialize(server, channel) @server, @channel = server, channel end @@ -25,11 +27,8 @@ module ActionCable end def broadcast_without_logging(message) - server.redis_for_threads.publish channel, message.to_json + server.broadcasting_redis.publish channel, message.to_json end - - private - attr_reader :server, :channel end end end -- cgit v1.2.3 From 5c4f07d34e82310e2ce9029ddaafb6603435da73 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 28 Jun 2015 21:17:16 +0200 Subject: Introduce Streams as the domain language for the pubsub channels Channels redeliver messages from --- lib/action_cable/channel.rb | 2 +- lib/action_cable/channel/base.rb | 2 +- lib/action_cable/channel/redis.rb | 37 ------------------------------ lib/action_cable/channel/streams.rb | 40 +++++++++++++++++++++++++++++++++ lib/action_cable/server/broadcasting.rb | 18 +++++++-------- 5 files changed, 51 insertions(+), 48 deletions(-) delete mode 100644 lib/action_cable/channel/redis.rb create mode 100644 lib/action_cable/channel/streams.rb (limited to 'lib/action_cable') diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb index 94cdc8d722..0432052514 100644 --- a/lib/action_cable/channel.rb +++ b/lib/action_cable/channel.rb @@ -1,7 +1,7 @@ module ActionCable module Channel autoload :Callbacks, 'action_cable/channel/callbacks' - autoload :Redis, 'action_cable/channel/redis' + autoload :Streams, 'action_cable/channel/streams' autoload :Base, 'action_cable/channel/base' end end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index 6c55a8ed65..39a5a7e795 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -2,7 +2,7 @@ module ActionCable module Channel class Base include Callbacks - include Redis + include Streams on_subscribe :start_periodic_timers on_unsubscribe :stop_periodic_timers diff --git a/lib/action_cable/channel/redis.rb b/lib/action_cable/channel/redis.rb deleted file mode 100644 index 0f77dc0418..0000000000 --- a/lib/action_cable/channel/redis.rb +++ /dev/null @@ -1,37 +0,0 @@ -module ActionCable - module Channel - module Redis - extend ActiveSupport::Concern - - included do - on_unsubscribe :unsubscribe_from_all_channels - delegate :pubsub, to: :connection - end - - def subscribe_to(redis_channel, callback = nil) - callback ||= default_subscription_callback(redis_channel) - @_redis_channels ||= [] - @_redis_channels << [ redis_channel, callback ] - - pubsub.subscribe(redis_channel, &callback) - logger.info "#{channel_name} subscribed to broadcasts from #{redis_channel}" - end - - def unsubscribe_from_all_channels - if @_redis_channels - @_redis_channels.each do |redis_channel, callback| - pubsub.unsubscribe_proc(redis_channel, callback) - logger.info "#{channel_name} unsubscribed to broadcasts from #{redis_channel}" - end - end - end - - protected - def default_subscription_callback(channel) - -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "broadcast from #{channel}" - end - end - end - end -end diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb new file mode 100644 index 0000000000..3eac776e61 --- /dev/null +++ b/lib/action_cable/channel/streams.rb @@ -0,0 +1,40 @@ +module ActionCable + module Channel + module Streams + extend ActiveSupport::Concern + + included do + on_unsubscribe :stop_all_streams + end + + def stream_from(broadcasting, callback = nil) + callback ||= default_stream_callback(broadcasting) + + streams << [ broadcasting, callback ] + pubsub.subscribe broadcasting, &callback + + logger.info "#{channel_name} is streaming from #{broadcasting}" + end + + def stop_all_streams + streams.each do |broadcasting, callback| + pubsub.unsubscribe_proc broadcasting, callback + logger.info "#{channel_name} stopped streaming from #{broadcasting}" + end + end + + private + delegate :pubsub, to: :connection + + def streams + @_streams ||= [] + end + + def default_stream_callback(broadcasting) + -> (message) do + transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + end + end + end + end +end diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 3fbaa05039..868d418ece 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -1,12 +1,12 @@ module ActionCable module Server module Broadcasting - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) + def broadcast(broadcasting, message) + broadcaster_for(broadcasting).broadcast(message) end - def broadcaster_for(channel) - Broadcaster.new(self, channel) + def broadcaster_for(broadcasting) + Broadcaster.new(self, broadcasting) end def broadcasting_redis @@ -15,19 +15,19 @@ module ActionCable private class Broadcaster - attr_reader :server, :channel + attr_reader :server, :broadcasting - def initialize(server, channel) - @server, @channel = server, channel + def initialize(server, broadcasting) + @server, @broadcasting = server, broadcasting end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{channel}: #{message}" + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" broadcast_without_logging(message) end def broadcast_without_logging(message) - server.broadcasting_redis.publish channel, message.to_json + server.broadcasting_redis.publish broadcasting, message.to_json end end end -- cgit v1.2.3 From 10323716a134bb86708f6a65280215f8a7f18a1a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 2 Jul 2015 15:41:50 +0200 Subject: Expose broadcast_without_logging at the top level --- lib/action_cable/server/broadcasting.rb | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'lib/action_cable') diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb index 868d418ece..b0e51b8ba8 100644 --- a/lib/action_cable/server/broadcasting.rb +++ b/lib/action_cable/server/broadcasting.rb @@ -5,6 +5,10 @@ module ActionCable broadcaster_for(broadcasting).broadcast(message) end + def broadcast_without_logging(broadcasting, message) + broadcaster_for(broadcasting).broadcast_without_logging(message) + end + def broadcaster_for(broadcasting) Broadcaster.new(self, broadcasting) end -- cgit v1.2.3 From 5de01033150b70982f23a42670c55348a7371c4b Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Thu, 2 Jul 2015 11:52:23 -0400 Subject: Guard against duplicate subscriptions --- lib/action_cable/connection/subscriptions.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/action_cable') diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb index 2672279828..24ab1bdfbf 100644 --- a/lib/action_cable/connection/subscriptions.rb +++ b/lib/action_cable/connection/subscriptions.rb @@ -27,7 +27,7 @@ module ActionCable end if subscription_klass - subscriptions[id_key] = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else logger.error "Subscription class not found (#{data.inspect})" end -- cgit v1.2.3