aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb90
-rw-r--r--actioncable/lib/action_cable/channel/broadcasting.rb4
-rw-r--r--actioncable/lib/action_cable/channel/callbacks.rb2
-rw-r--r--actioncable/lib/action_cable/channel/naming.rb3
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb52
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb102
6 files changed, 182 insertions, 71 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 874ebe2e71..a866044f95 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -1,4 +1,4 @@
-require 'set'
+require "set"
module ActionCable
module Channel
@@ -32,8 +32,8 @@ module ActionCable
#
# == Action processing
#
- # Unlike subclasses of ActionController::Base, channels do not follow a REST
- # constraint form for their actions. Instead, ActionCable operates through a
+ # Unlike subclasses of ActionController::Base, channels do not follow a RESTful
+ # constraint form for their actions. Instead, Action Cable operates through a
# remote-procedure call model. You can declare any public method on the
# channel (optionally taking a <tt>data</tt> argument), and this method is
# automatically exposed as callable to the client.
@@ -63,10 +63,10 @@ module ActionCable
# end
# end
#
- # In this example, subscribed/unsubscribed are not callable methods, as they
+ # In this example, the subscribed and unsubscribed methods are not callable methods, as they
# were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
# and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
- # callable as it's a private method. You'll see that appear accepts a data
+ # callable, since it's a private method. You'll see that appear accepts a data
# parameter, which it then uses as part of its model call. <tt>#away</tt>
# does not, since it's simply a trigger action.
#
@@ -125,7 +125,7 @@ module ActionCable
protected
# action_methods are cached and there is sometimes need to refresh
# them. ::clear_action_methods! allows you to do that, so next time
- # you run action_methods, they will be recalculated
+ # you run action_methods, they will be recalculated.
def clear_action_methods!
@action_methods = nil
end
@@ -144,13 +144,14 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed.
- @defer_subscription_confirmation = false
+ #
+ # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
- subscribe_to_channel
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
@@ -160,21 +161,34 @@ module ActionCable
action = extract_action(data)
if processable_action?(action)
- dispatch_action(action, data)
+ payload = { channel_class: self.class.name, action: action, data: data }
+ ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
+ dispatch_action(action, data)
+ end
else
logger.error "Unable to process #{action_signature(action, data)}"
end
end
- # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
+ # This method is called after subscription has been added to the connection
+ # and confirms or rejects the subscription.
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ reject_subscription if subscription_rejected?
+ ensure_confirmation_sent
+ end
+
+ # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
- def unsubscribe_from_channel
+ def unsubscribe_from_channel # :nodoc:
run_callbacks :unsubscribe do
unsubscribed
end
end
-
protected
# Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
# you want this channel to be sending to the subscriber.
@@ -183,7 +197,7 @@ module ActionCable
end
# Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
- # people as offline or the like.
+ # users as offline or the like.
def unsubscribed
# Override in subclasses
end
@@ -191,16 +205,26 @@ module ActionCable
# Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
- logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+ logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via }
+
+ payload = { channel_class: self.class.name, data: data, via: via }
+ ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
+ connection.transmit identifier: @identifier, message: data
+ end
+ end
+
+ def ensure_confirmation_sent
+ return if subscription_rejected?
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
end
def defer_subscription_confirmation!
- @defer_subscription_confirmation = true
+ @defer_subscription_confirmation_counter.increment
end
def defer_subscription_confirmation?
- @defer_subscription_confirmation
+ @defer_subscription_confirmation_counter.value > 0
end
def subscription_confirmation_sent?
@@ -224,26 +248,12 @@ module ActionCable
end
end
-
- def subscribe_to_channel
- run_callbacks :subscribe do
- subscribed
- end
-
- if subscription_rejected?
- reject_subscription
- else
- transmit_subscription_confirmation unless defer_subscription_confirmation?
- end
- end
-
-
def extract_action(data)
- (data['action'].presence || :receive).to_sym
+ (data["action"].presence || :receive).to_sym
end
def processable_action?(action)
- self.class.action_methods.include?(action.to_s)
+ self.class.action_methods.include?(action.to_s) unless subscription_rejected?
end
def dispatch_action(action, data)
@@ -258,7 +268,7 @@ module ActionCable
def action_signature(action, data)
"#{self.class.name}##{action}".tap do |signature|
- if (arguments = data.except('action')).any?
+ if (arguments = data.except("action")).any?
signature << "(#{arguments.inspect})"
end
end
@@ -267,8 +277,11 @@ module ActionCable
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
- @subscription_confirmation_sent = true
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
+ @subscription_confirmation_sent = true
+ end
end
end
@@ -279,7 +292,10 @@ module ActionCable
def transmit_subscription_rejection
logger.info "#{self.class.name} is transmitting the subscription rejection"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb
index afc23d7d1a..23ed4ec943 100644
--- a/actioncable/lib/action_cable/channel/broadcasting.rb
+++ b/actioncable/lib/action_cable/channel/broadcasting.rb
@@ -1,4 +1,4 @@
-require 'active_support/core_ext/object/to_param'
+require "active_support/core_ext/object/to_param"
module ActionCable
module Channel
@@ -16,7 +16,7 @@ module ActionCable
def broadcasting_for(model) #:nodoc:
case
when model.is_a?(Array)
- model.map { |m| broadcasting_for(m) }.join(':')
+ model.map { |m| broadcasting_for(m) }.join(":")
when model.respond_to?(:to_gid_param)
model.to_gid_param
else
diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb
index 295d750e86..c740132c94 100644
--- a/actioncable/lib/action_cable/channel/callbacks.rb
+++ b/actioncable/lib/action_cable/channel/callbacks.rb
@@ -1,4 +1,4 @@
-require 'active_support/callbacks'
+require "active_support/callbacks"
module ActionCable
module Channel
diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb
index 4c9d53b15a..b7e88bf73d 100644
--- a/actioncable/lib/action_cable/channel/naming.rb
+++ b/actioncable/lib/action_cable/channel/naming.rb
@@ -10,8 +10,9 @@ module ActionCable
#
# ChatChannel.channel_name # => 'chat'
# Chats::AppearancesChannel.channel_name # => 'chats:appearances'
+ # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances'
def channel_name
- @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore
+ @channel_name ||= name.sub(/Channel$/, "").gsub("::",":").underscore
end
end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 56597d02d7..c9daa0bcd3 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -12,11 +12,42 @@ module ActionCable
end
module ClassMethods
- # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
- # for sending a steady flow of updates to a client based off an object that was configured on subscription.
- # It's an alternative to using streams if the channel is able to do the work internally.
- def periodically(callback, every:)
- self.periodic_timers += [ [ callback, every: every ] ]
+ # Periodically performs a task on the channel, like updating an online
+ # user counter, polling a backend for new status messages, sending
+ # regular "heartbeat" messages, or doing some internal work and giving
+ # progress updates.
+ #
+ # Pass a method name or lambda argument or provide a block to call.
+ # Specify the calling period in seconds using the <tt>every:</tt>
+ # keyword argument.
+ #
+ # periodically :transmit_progress, every: 5.seconds
+ #
+ # periodically every: 3.minutes do
+ # transmit action: :update_count, count: current_count
+ # end
+ #
+ def periodically(callback_or_method_name = nil, every:, &block)
+ callback =
+ if block_given?
+ raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name
+ block
+ else
+ case callback_or_method_name
+ when Proc
+ callback_or_method_name
+ when Symbol
+ -> { __send__ callback_or_method_name }
+ else
+ raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
+ end
+ end
+
+ unless every.kind_of?(Numeric) && every > 0
+ raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
+ end
+
+ self.periodic_timers += [[ callback, every: every ]]
end
end
@@ -27,14 +58,19 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
- connection.worker_pool.async_run_periodic_timer(self, callback)
- end
+ active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
+ end
+ end
+
+ def start_periodic_timer(callback, every:)
+ connection.server.event_loop.timer every do
+ connection.worker_pool.async_exec self, connection: connection, &callback
end
end
def stop_periodic_timers
active_periodic_timers.each { |timer| timer.shutdown }
+ active_periodic_timers.clear
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 3158f30814..dbba333353 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -1,8 +1,8 @@
module ActionCable
module Channel
- # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
- # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
- # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later.
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
+ # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
+ # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
@@ -18,13 +18,15 @@ module ActionCable
# end
# end
#
- # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there.
- # That looks like so from that side of things:
+ # Based on the above example, the subscribers of this channel will get whatever data is put into the,
+ # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there.
+ #
+ # An example broadcasting for this channel looks like so:
#
# ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
#
# If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
- # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`
+ # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>.
#
# class CommentsChannel < ApplicationCable::Channel
# def subscribed
@@ -37,16 +39,14 @@ module ActionCable
#
# CommentsChannel.broadcast_to(@post, @comment)
#
- # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out.
- # Example below shows how you can use this to provide performance introspection in the process:
+ # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out.
+ # The below example shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
- #
+ # stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
@@ -69,16 +69,22 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
- def stream_from(broadcasting, callback = nil)
- # Hold off the confirmation until pubsub#subscribe is successful
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
+ def stream_from(broadcasting, callback = nil, coder: nil, &block)
+ broadcasting = String(broadcasting)
+
+ # Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- callback ||= default_stream_callback(broadcasting)
- streams << [ broadcasting, callback ]
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
+ streams << [ broadcasting, handler ]
- Concurrent.global_io_executor.post do
- pubsub.subscribe(broadcasting, callback, lambda do
- transmit_subscription_confirmation
+ connection.server.event_loop.post do
+ pubsub.subscribe(broadcasting, handler, lambda do
+ ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
end
@@ -87,8 +93,11 @@ module ActionCable
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
- def stream_for(model, callback = nil)
- stream_from(broadcasting_for([ channel_name, model ]), callback)
+ #
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
+ def stream_for(model, callback = nil, coder: nil, &block)
+ stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end
# Unsubscribes all streams associated with this channel from the pubsub queue.
@@ -106,11 +115,60 @@ module ActionCable
@_streams ||= []
end
- def default_stream_callback(broadcasting)
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to the client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
+ def default_stream_handler(broadcasting, coder:)
+ coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
+
-> (message) do
- transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ transmit handler.(message), via: via
end
end
+
+ def identity_handler
+ -> message { message }
+ end
end
end
end