aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/connection/base.rb
diff options
context:
space:
mode:
authorDavid Heinemeier Hansson <david@loudthinking.com>2015-06-21 23:02:46 +0200
committerDavid Heinemeier Hansson <david@loudthinking.com>2015-06-21 23:02:46 +0200
commite0926038983177f5491e45cc338e5dc091e3a86d (patch)
tree46de9dbc83f22f4dd305b5e01992b30053c8ecf3 /lib/action_cable/connection/base.rb
parent375b315da62d55b47074ab8cdde60eac4dfaef2a (diff)
downloadrails-e0926038983177f5491e45cc338e5dc091e3a86d.tar.gz
rails-e0926038983177f5491e45cc338e5dc091e3a86d.tar.bz2
rails-e0926038983177f5491e45cc338e5dc091e3a86d.zip
Wrap message queueing in a more welcoming API
Diffstat (limited to 'lib/action_cable/connection/base.rb')
-rw-r--r--lib/action_cable/connection/base.rb26
1 files changed, 21 insertions, 5 deletions
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 995b0901ca..71e84aed99 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -14,9 +14,6 @@ module ActionCable
@server, @env = server, env
- @accept_messages = false
- @pending_messages = []
-
@logger = TaggedLoggerProxy.new(server.logger, tags: log_tags)
@heartbeat = ActionCable::Connection::Heartbeat.new(self)
@@ -38,10 +35,10 @@ module ActionCable
message = event.data
if message.is_a?(String)
- if @accept_messages
+ if accepting_messages?
worker_pool.async.invoke(self, :receive, message)
else
- @pending_messages << message
+ queue_message message
end
else
logger.error "Couldn't handle non-string message: #{message.class}"
@@ -117,10 +114,29 @@ module ActionCable
connect if respond_to?(:connect)
subscribe_to_internal_channel
+ ready_to_accept_messages
+ process_pending_messages
+ end
+
+
+ def accepting_messages?
+ @accept_messages
+ end
+
+ def ready_to_accept_messages
@accept_messages = true
+ end
+
+ def queue_message(message)
+ @pending_messages ||= []
+ @pending_messages << message
+ end
+
+ def process_pending_messages
worker_pool.async.invoke(self, :receive, @pending_messages.shift) until @pending_messages.empty?
end
+
def on_close
server.remove_connection(self)