diff options
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 80 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/broadcasting.rb | 6 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/callbacks.rb | 4 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/naming.rb | 5 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/periodic_timers.rb | 11 | ||||
-rw-r--r-- | actioncable/lib/action_cable/channel/streams.rb | 18 |
6 files changed, 70 insertions, 54 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 845b747fc5..c5ad749bfe 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -1,4 +1,6 @@ -require 'set' +# frozen_string_literal: true + +require "set" module ActionCable module Channel @@ -122,16 +124,16 @@ module ActionCable end end - protected + private # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time # you run action_methods, they will be recalculated. - def clear_action_methods! + def clear_action_methods! # :doc: @action_methods = nil end # Refresh the cached action_methods when a new action_method is added. - def method_added(name) + def method_added(name) # :doc: super clear_action_methods! end @@ -144,13 +146,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +172,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -177,24 +191,25 @@ module ActionCable end end - - protected + private # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams # you want this channel to be sending to the subscriber. - def subscribed + def subscribed # :doc: # Override in subclasses end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking # users as offline or the like. - def unsubscribed + def unsubscribed # :doc: # Override in subclasses end # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. - def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } + def transmit(data, via: nil) # :doc: + status = "#{self.class.name} transmitting #{data.inspect.truncate(300)}" + status += " (via #{via})" if via + logger.debug(status) payload = { channel_class: self.class.name, data: data, via: via } ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do @@ -202,27 +217,32 @@ module ActionCable end end - def defer_subscription_confirmation! - @defer_subscription_confirmation = true + def ensure_confirmation_sent # :doc: + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? end - def defer_subscription_confirmation? - @defer_subscription_confirmation + def defer_subscription_confirmation! # :doc: + @defer_subscription_confirmation_counter.increment end - def subscription_confirmation_sent? + def defer_subscription_confirmation? # :doc: + @defer_subscription_confirmation_counter.value > 0 + end + + def subscription_confirmation_sent? # :doc: @subscription_confirmation_sent end - def reject + def reject # :doc: @reject_subscription = true end - def subscription_rejected? + def subscription_rejected? # :doc: @reject_subscription end - private def delegate_connection_identifiers connection.identifiers.each do |identifier| define_singleton_method(identifier) do @@ -231,24 +251,12 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) - (data['action'].presence || :receive).to_sym + (data["action"].presence || :receive).to_sym end def processable_action?(action) - self.class.action_methods.include?(action.to_s) + self.class.action_methods.include?(action.to_s) unless subscription_rejected? end def dispatch_action(action, data) @@ -262,8 +270,8 @@ module ActionCable end def action_signature(action, data) - "#{self.class.name}##{action}".tap do |signature| - if (arguments = data.except('action')).any? + "#{self.class.name}##{action}".dup.tap do |signature| + if (arguments = data.except("action")).any? signature << "(#{arguments.inspect})" end end diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb index afc23d7d1a..acc791817b 100644 --- a/actioncable/lib/action_cable/channel/broadcasting.rb +++ b/actioncable/lib/action_cable/channel/broadcasting.rb @@ -1,4 +1,6 @@ -require 'active_support/core_ext/object/to_param' +# frozen_string_literal: true + +require "active_support/core_ext/object/to_param" module ActionCable module Channel @@ -16,7 +18,7 @@ module ActionCable def broadcasting_for(model) #:nodoc: case when model.is_a?(Array) - model.map { |m| broadcasting_for(m) }.join(':') + model.map { |m| broadcasting_for(m) }.join(":") when model.respond_to?(:to_gid_param) model.to_gid_param else diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb index 295d750e86..4223c0d996 100644 --- a/actioncable/lib/action_cable/channel/callbacks.rb +++ b/actioncable/lib/action_cable/channel/callbacks.rb @@ -1,4 +1,6 @@ -require 'active_support/callbacks' +# frozen_string_literal: true + +require "active_support/callbacks" module ActionCable module Channel diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index 4c9d53b15a..03a5dcd3a0 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module ActionCable module Channel module Naming @@ -10,8 +12,9 @@ module ActionCable # # ChatChannel.channel_name # => 'chat' # Chats::AppearancesChannel.channel_name # => 'chats:appearances' + # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name - @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore + @channel_name ||= name.sub(/Channel$/, "").gsub("::", ":").underscore end end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index dab604440f..830b3efa3c 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -1,11 +1,12 @@ +# frozen_string_literal: true + module ActionCable module Channel module PeriodicTimers extend ActiveSupport::Concern included do - class_attribute :periodic_timers, instance_reader: false - self.periodic_timers = [] + class_attribute :periodic_timers, instance_reader: false, default: [] after_subscribe :start_periodic_timers after_unsubscribe :stop_periodic_timers @@ -30,7 +31,7 @@ module ActionCable def periodically(callback_or_method_name = nil, every:, &block) callback = if block_given? - raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name block else case callback_or_method_name @@ -64,9 +65,7 @@ module ActionCable def start_periodic_timer(callback, every:) connection.server.event_loop.timer every do - connection.worker_pool.async_invoke connection do - instance_exec(&callback) - end + connection.worker_pool.async_exec self, connection: connection, &callback end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 200c9d053c..81c2c38064 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,3 +1,5 @@ +# frozen_string_literal: true + module ActionCable module Channel # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data @@ -19,14 +21,14 @@ module ActionCable # end # # Based on the above example, the subscribers of this channel will get whatever data is put into the, - # let's say, `comments_for_45` broadcasting as soon as it's put there. + # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there. # # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. - # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE` + # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>. # # class CommentsChannel < ApplicationCable::Channel # def subscribed @@ -69,8 +71,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) @@ -84,7 +86,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end @@ -94,8 +96,8 @@ module ActionCable # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. # - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end @@ -138,7 +140,7 @@ module ActionCable end # May be overridden to change the default stream handling behavior - # which decodes JSON and transmits to client. + # which decodes JSON and transmits to the client. # # TODO: Tests demonstrating this. # |