aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid Heinemeier Hansson <david@loudthinking.com>2015-06-22 15:47:32 +0200
committerDavid Heinemeier Hansson <david@loudthinking.com>2015-06-22 15:47:32 +0200
commitd796d9a61e1208f0706642ff02f7c8236185e55a (patch)
tree165f2b15b30670f5bcd2111ca6c11fa45b44e4af
parent8115d25033a0af2d29b57e1e5a6afaa70038a3d4 (diff)
downloadrails-d796d9a61e1208f0706642ff02f7c8236185e55a.tar.gz
rails-d796d9a61e1208f0706642ff02f7c8236185e55a.tar.bz2
rails-d796d9a61e1208f0706642ff02f7c8236185e55a.zip
Finish Processor class extraction
-rw-r--r--lib/action_cable/connection.rb1
-rw-r--r--lib/action_cable/connection/base.rb31
-rw-r--r--lib/action_cable/connection/processor.rb2
3 files changed, 10 insertions, 24 deletions
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index 991dd85c57..5928a47949 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -4,6 +4,7 @@ module ActionCable
autoload :Heartbeat, 'action_cable/connection/heartbeat'
autoload :Identification, 'action_cable/connection/identification'
autoload :InternalChannel, 'action_cable/connection/internal_channel'
+ autoload :Processor, 'action_cable/connection/processor'
autoload :Subscriptions, 'action_cable/connection/subscriptions'
autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
end
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 1fdc6f0fe8..c3c99dcec4 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -29,7 +29,7 @@ module ActionCable
@websocket.on(:open) do |event|
heartbeat.start
- worker_pool.async.invoke(self, :on_open)
+ send_async :on_open
end
@websocket.on(:message) do |event|
@@ -40,7 +40,7 @@ module ActionCable
logger.info finished_request_message
heartbeat.stop
- worker_pool.async.invoke(self, :on_close)
+ send_async :on_close
end
@websocket.rack_response
@@ -77,6 +77,10 @@ module ActionCable
end
+ def send_async(method, *arguments)
+ worker_pool.async.invoke(self, method, *arguments)
+ end
+
def statistics
{
identifier: connection_identifier,
@@ -97,7 +101,7 @@ module ActionCable
private
- attr_reader :heartbeat, :subscriptions
+ attr_reader :heartbeat, :subscriptions, :processor
def on_open
server.add_connection(self)
@@ -105,26 +109,7 @@ module ActionCable
connect if respond_to?(:connect)
subscribe_to_internal_channel
- ready_to_accept_messages
- process_pending_messages
- end
-
-
- def accepting_messages?
- @accept_messages
- end
-
- def ready_to_accept_messages
- @accept_messages = true
- end
-
- def queue_message(message)
- @pending_messages ||= []
- @pending_messages << message
- end
-
- def process_pending_messages
- worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty?
+ processor.ready!
end
diff --git a/lib/action_cable/connection/processor.rb b/lib/action_cable/connection/processor.rb
index 2060392478..3191be4a4c 100644
--- a/lib/action_cable/connection/processor.rb
+++ b/lib/action_cable/connection/processor.rb
@@ -30,7 +30,7 @@ module ActionCable
attr_accessor :pending_messages
def process(message)
- connection.worker_pool.async.invoke(connection, :receive, message)
+ connection.send_async :receive, message
end
def queue(message)