From d796d9a61e1208f0706642ff02f7c8236185e55a Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 22 Jun 2015 15:47:32 +0200 Subject: Finish Processor class extraction --- lib/action_cable/connection.rb | 1 + lib/action_cable/connection/base.rb | 31 ++++++++----------------------- lib/action_cable/connection/processor.rb | 2 +- 3 files changed, 10 insertions(+), 24 deletions(-) diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 991dd85c57..5928a47949 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -4,6 +4,7 @@ module ActionCable autoload :Heartbeat, 'action_cable/connection/heartbeat' autoload :Identification, 'action_cable/connection/identification' autoload :InternalChannel, 'action_cable/connection/internal_channel' + autoload :Processor, 'action_cable/connection/processor' autoload :Subscriptions, 'action_cable/connection/subscriptions' autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy' end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index 1fdc6f0fe8..c3c99dcec4 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -29,7 +29,7 @@ module ActionCable @websocket.on(:open) do |event| heartbeat.start - worker_pool.async.invoke(self, :on_open) + send_async :on_open end @websocket.on(:message) do |event| @@ -40,7 +40,7 @@ module ActionCable logger.info finished_request_message heartbeat.stop - worker_pool.async.invoke(self, :on_close) + send_async :on_close end @websocket.rack_response @@ -77,6 +77,10 @@ module ActionCable end + def send_async(method, *arguments) + worker_pool.async.invoke(self, method, *arguments) + end + def statistics { identifier: connection_identifier, @@ -97,7 +101,7 @@ module ActionCable private - attr_reader :heartbeat, :subscriptions + attr_reader :heartbeat, :subscriptions, :processor def on_open server.add_connection(self) @@ -105,26 +109,7 @@ module ActionCable connect if respond_to?(:connect) subscribe_to_internal_channel - ready_to_accept_messages - process_pending_messages - end - - - def accepting_messages? - @accept_messages - end - - def ready_to_accept_messages - @accept_messages = true - end - - def queue_message(message) - @pending_messages ||= [] - @pending_messages << message - end - - def process_pending_messages - worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty? + processor.ready! end diff --git a/lib/action_cable/connection/processor.rb b/lib/action_cable/connection/processor.rb index 2060392478..3191be4a4c 100644 --- a/lib/action_cable/connection/processor.rb +++ b/lib/action_cable/connection/processor.rb @@ -30,7 +30,7 @@ module ActionCable attr_accessor :pending_messages def process(message) - connection.worker_pool.async.invoke(connection, :receive, message) + connection.send_async :receive, message end def queue(message) -- cgit v1.2.3