diff options
-rw-r--r-- | lib/action_cable/connection.rb | 1 | ||||
-rw-r--r-- | lib/action_cable/connection/base.rb | 46 | ||||
-rw-r--r-- | lib/action_cable/connection/subscriptions.rb | 54 |
3 files changed, 64 insertions, 37 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 8a695a3d0d..31480f220f 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -3,6 +3,7 @@ module ActionCable autoload :Base, 'action_cable/connection/base' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' + autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ffef85ead2..df07c567fb 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -9,6 +9,8 @@ module ActionCable attr_reader :server, :env delegate :worker_pool, :pubsub, to: :server + attr_reader :subscriptions + attr_reader :logger def initialize(server, env) @@ -19,9 +21,9 @@ module ActionCable @accept_messages = false @pending_messages = [] - @subscriptions = {} - @logger = TaggedLoggerProxy.new(server.logger, tags: log_tags) + + @subscriptions = ActionCable::Connection::Subscriptions.new(self) end def process @@ -66,8 +68,8 @@ module ActionCable data = ActiveSupport::JSON.decode data_in_json case data['command'] - when 'subscribe' then subscribe_channel data - when 'unsubscribe' then unsubscribe_channel data + when 'subscribe' then subscriptions.add data + when 'unsubscribe' then subscriptions.remove data when 'message' then process_message data else logger.error "Received unrecognized command in #{data.inspect}" @@ -91,7 +93,7 @@ module ActionCable { identifier: connection_identifier, started_at: @started_at, - subscriptions: @subscriptions.keys + subscriptions: subscriptions.identifiers } end @@ -120,54 +122,24 @@ module ActionCable def on_close server.remove_connection(self) - cleanup_subscriptions + subscriptions.cleanup unsubscribe_from_internal_channel disconnect if respond_to?(:disconnect) end - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.perform_disconnection - end - end - def transmit_ping_timestamp transmit({ identifier: '_ping', message: Time.now.to_i }.to_json) end - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Subscription class not found (#{data.inspect})" - end - rescue Exception => e - logger.error "Could not subscribe to channel (#{data.inspect})" - log_exception(e) - end def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data']) - else - raise "Unable to process message because no subscription was found (#{message.inspect})" - end + subscriptions.find(message['identifier']).perform_action(ActiveSupport::JSON.decode(message['data'])) rescue Exception => e logger.error "Could not process message (#{message.inspect})" log_exception(e) end - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].perform_disconnection - @subscriptions.delete(data['identifier']) - end def invalid_request logger.info finished_request_message diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb new file mode 100644 index 0000000000..888b93a652 --- /dev/null +++ b/lib/action_cable/connection/subscriptions.rb @@ -0,0 +1,54 @@ +module ActionCable + module Connection + class Subscriptions + def initialize(connection) + @connection = connection + @subscriptions = {} + end + + def add(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = connection.server.registered_channels.detect do |channel_klass| + channel_klass.find_name == id_options[:channel] + end + + if subscription_klass + subscriptions[id_key] = subscription_klass.new(connection, id_key, id_options) + else + connection.logger.error "Subscription class not found (#{data.inspect})" + end + rescue Exception => e + connection.logger.error "Could not subscribe to channel (#{data.inspect}) due to '#{e}': #{e.backtrace.join(' - ')}" + end + + def remove(data) + connection.logger.info "Unsubscribing from channel: #{data['identifier']}" + subscriptions[data['identifier']].perform_disconnection + subscriptions.delete(data['identifier']) + end + + def find(identifier) + if subscription = subscriptions[identifier] + subscription + else + raise "Unable to find subscription with identifier: #{identifier}" + end + end + + def identifiers + subscriptions.keys + end + + def cleanup + subscriptions.each do |id, channel| + channel.perform_disconnection + end + end + + private + attr_reader :connection, :subscriptions + end + end +end
\ No newline at end of file |