From e7b1ced7a4fab45e3fc5851e5500426022fa0c47 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Sun, 21 Jun 2015 22:35:07 +0200 Subject: Extracted Subscriptions class --- lib/action_cable/connection/base.rb | 46 ++++++++----------------------------- 1 file changed, 9 insertions(+), 37 deletions(-) (limited to 'lib/action_cable/connection/base.rb') diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ffef85ead2..df07c567fb 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -9,6 +9,8 @@ module ActionCable attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :subscriptions + attr_reader :logger def initialize(server, env) @@ -19,9 +21,9 @@ module ActionCable @accept_messages = false @pending_messages = [] - @subscriptions = {} - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + + @subscriptions = ActionCable::Connection::Subscriptions.new(self) end def process @@ -66,8 +68,8 @@ module ActionCable data = ActiveSupport::JSON.decode data_in_json case data['command'] - when 'subscribe' then subscribe_channel data - when 'unsubscribe' then unsubscribe_channel data + when 'subscribe' then subscriptions.add data + when 'unsubscribe' then subscriptions.remove data when 'message' then process_message data else logger.error "Received unrecognized command in #{data.inspect}" @@ -91,7 +93,7 @@ module ActionCable { identifier: connection_identifier, started_at: @started_at, - subscriptions: @subscriptions.keys + subscriptions: subscriptions.identifiers } end @@ -120,54 +122,24 @@ module ActionCable def on_close server.remove_connection(self) - cleanup_subscriptions + subscriptions.cleanup unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection - end - end - def transmit_ping_timestamp transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) end - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Subscription class not found (#{data.inspect})" - end - rescue Exception => e - logger.error "Could not subscribe to channel (#{data.inspect})" - log_exception(e) - end def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data']) - else - raise "Unable to process message because no subscription was found (#{message.inspect})" - end + subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) rescue Exception => e logger.error "Could not process message (#{message.inspect})" log_exception(e) end - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].perform_disconnection - @subscriptions.delete(data['identifier']) - end def invalid_request logger.info finished_request_message -- cgit v1.2.3