aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/channel')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb309
-rw-r--r--actioncable/lib/action_cable/channel/broadcasting.rb31
-rw-r--r--actioncable/lib/action_cable/channel/callbacks.rb37
-rw-r--r--actioncable/lib/action_cable/channel/naming.rb25
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb78
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb176
-rw-r--r--actioncable/lib/action_cable/channel/test_case.rb271
7 files changed, 927 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
new file mode 100644
index 0000000000..ad0d3685cd
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -0,0 +1,309 @@
+# frozen_string_literal: true
+
+require "set"
+require "active_support/rescuable"
+
+module ActionCable
+ module Channel
+ # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
+ # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
+ # responding to the subscriber's direct requests.
+ #
+ # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
+ # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
+ # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
+ # as is normally the case with a controller instance that gets thrown away after every request.
+ #
+ # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
+ # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
+ #
+ # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
+ # can interact with. Here's a quick example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # end
+ #
+ # def speak(data)
+ # @room.speak data, user: current_user
+ # end
+ # end
+ #
+ # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
+ # subscriber wants to say something in the room.
+ #
+ # == Action processing
+ #
+ # Unlike subclasses of ActionController::Base, channels do not follow a RESTful
+ # constraint form for their actions. Instead, Action Cable operates through a
+ # remote-procedure call model. You can declare any public method on the
+ # channel (optionally taking a <tt>data</tt> argument), and this method is
+ # automatically exposed as callable to the client.
+ #
+ # Example:
+ #
+ # class AppearanceChannel < ApplicationCable::Channel
+ # def subscribed
+ # @connection_token = generate_connection_token
+ # end
+ #
+ # def unsubscribed
+ # current_user.disappear @connection_token
+ # end
+ #
+ # def appear(data)
+ # current_user.appear @connection_token, on: data['appearing_on']
+ # end
+ #
+ # def away
+ # current_user.away @connection_token
+ # end
+ #
+ # private
+ # def generate_connection_token
+ # SecureRandom.hex(36)
+ # end
+ # end
+ #
+ # In this example, the subscribed and unsubscribed methods are not callable methods, as they
+ # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
+ # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
+ # callable, since it's a private method. You'll see that appear accepts a data
+ # parameter, which it then uses as part of its model call. <tt>#away</tt>
+ # does not, since it's simply a trigger action.
+ #
+ # Also note that in this example, <tt>current_user</tt> is available because
+ # it was marked as an identifying attribute on the connection. All such
+ # identifiers will automatically create a delegation method of the same name
+ # on the channel instance.
+ #
+ # == Rejecting subscription requests
+ #
+ # A channel can reject a subscription request in the #subscribed callback by
+ # invoking the #reject method:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # reject unless current_user.can_access?(@room)
+ # end
+ # end
+ #
+ # In this example, the subscription will be rejected if the
+ # <tt>current_user</tt> does not have access to the chat room. On the
+ # client-side, the <tt>Channel#rejected</tt> callback will get invoked when
+ # the server rejects the subscription request.
+ class Base
+ include Callbacks
+ include PeriodicTimers
+ include Streams
+ include Naming
+ include Broadcasting
+ include ActiveSupport::Rescuable
+
+ attr_reader :params, :connection, :identifier
+ delegate :logger, to: :connection
+
+ class << self
+ # A list of method names that should be considered actions. This
+ # includes all public instance methods on a channel, less
+ # any internal methods (defined on Base), adding back in
+ # any methods that are internal, but still exist on the class
+ # itself.
+ #
+ # ==== Returns
+ # * <tt>Set</tt> - A set of all methods that should be considered actions.
+ def action_methods
+ @action_methods ||= begin
+ # All public instance methods of this class, including ancestors
+ methods = (public_instance_methods(true) -
+ # Except for public instance methods of Base and its ancestors
+ ActionCable::Channel::Base.public_instance_methods(true) +
+ # Be sure to include shadowed public instance methods of this class
+ public_instance_methods(false)).uniq.map(&:to_s)
+ methods.to_set
+ end
+ end
+
+ private
+ # action_methods are cached and there is sometimes need to refresh
+ # them. ::clear_action_methods! allows you to do that, so next time
+ # you run action_methods, they will be recalculated.
+ def clear_action_methods! # :doc:
+ @action_methods = nil
+ end
+
+ # Refresh the cached action_methods when a new action_method is added.
+ def method_added(name) # :doc:
+ super
+ clear_action_methods!
+ end
+ end
+
+ def initialize(connection, identifier, params = {})
+ @connection = connection
+ @identifier = identifier
+ @params = params
+
+ # When a channel is streaming via pubsub, we want to delay the confirmation
+ # transmission until pubsub subscription is confirmed.
+ #
+ # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
+ @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
+
+ @reject_subscription = nil
+ @subscription_confirmation_sent = nil
+
+ delegate_connection_identifiers
+ end
+
+ # Extract the action name from the passed data and process it via the channel. The process will ensure
+ # that the action requested is a public method on the channel declared by the user (so not one of the callbacks
+ # like #subscribed).
+ def perform_action(data)
+ action = extract_action(data)
+
+ if processable_action?(action)
+ payload = { channel_class: self.class.name, action: action, data: data }
+ ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
+ dispatch_action(action, data)
+ end
+ else
+ logger.error "Unable to process #{action_signature(action, data)}"
+ end
+ end
+
+ # This method is called after subscription has been added to the connection
+ # and confirms or rejects the subscription.
+ def subscribe_to_channel
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ reject_subscription if subscription_rejected?
+ ensure_confirmation_sent
+ end
+
+ # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
+ # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
+ def unsubscribe_from_channel # :nodoc:
+ run_callbacks :unsubscribe do
+ unsubscribed
+ end
+ end
+
+ private
+ # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
+ # you want this channel to be sending to the subscriber.
+ def subscribed # :doc:
+ # Override in subclasses
+ end
+
+ # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
+ # users as offline or the like.
+ def unsubscribed # :doc:
+ # Override in subclasses
+ end
+
+ # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
+ # the proper channel identifier marked as the recipient.
+ def transmit(data, via: nil) # :doc:
+ status = "#{self.class.name} transmitting #{data.inspect.truncate(300)}"
+ status += " (via #{via})" if via
+ logger.debug(status)
+
+ payload = { channel_class: self.class.name, data: data, via: via }
+ ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
+ connection.transmit identifier: @identifier, message: data
+ end
+ end
+
+ def ensure_confirmation_sent # :doc:
+ return if subscription_rejected?
+ @defer_subscription_confirmation_counter.decrement
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
+
+ def defer_subscription_confirmation! # :doc:
+ @defer_subscription_confirmation_counter.increment
+ end
+
+ def defer_subscription_confirmation? # :doc:
+ @defer_subscription_confirmation_counter.value > 0
+ end
+
+ def subscription_confirmation_sent? # :doc:
+ @subscription_confirmation_sent
+ end
+
+ def reject # :doc:
+ @reject_subscription = true
+ end
+
+ def subscription_rejected? # :doc:
+ @reject_subscription
+ end
+
+ def delegate_connection_identifiers
+ connection.identifiers.each do |identifier|
+ define_singleton_method(identifier) do
+ connection.send(identifier)
+ end
+ end
+ end
+
+ def extract_action(data)
+ (data["action"].presence || :receive).to_sym
+ end
+
+ def processable_action?(action)
+ self.class.action_methods.include?(action.to_s) unless subscription_rejected?
+ end
+
+ def dispatch_action(action, data)
+ logger.info action_signature(action, data)
+
+ if method(action).arity == 1
+ public_send action, data
+ else
+ public_send action
+ end
+ rescue Exception => exception
+ rescue_with_handler(exception) || raise
+ end
+
+ def action_signature(action, data)
+ (+"#{self.class.name}##{action}").tap do |signature|
+ if (arguments = data.except("action")).any?
+ signature << "(#{arguments.inspect})"
+ end
+ end
+ end
+
+ def transmit_subscription_confirmation
+ unless subscription_confirmation_sent?
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
+ @subscription_confirmation_sent = true
+ end
+ end
+ end
+
+ def reject_subscription
+ connection.subscriptions.remove_subscription self
+ transmit_subscription_rejection
+ end
+
+ def transmit_subscription_rejection
+ logger.info "#{self.class.name} is transmitting the subscription rejection"
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb
new file mode 100644
index 0000000000..9a96720f4a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/broadcasting.rb
@@ -0,0 +1,31 @@
+# frozen_string_literal: true
+
+require "active_support/core_ext/object/to_param"
+
+module ActionCable
+ module Channel
+ module Broadcasting
+ extend ActiveSupport::Concern
+
+ delegate :broadcasting_for, to: :class
+
+ module ClassMethods
+ # Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel.
+ def broadcast_to(model, message)
+ ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message)
+ end
+
+ def broadcasting_for(model) #:nodoc:
+ case
+ when model.is_a?(Array)
+ model.map { |m| broadcasting_for(m) }.join(":")
+ when model.respond_to?(:to_gid_param)
+ model.to_gid_param
+ else
+ model.to_param
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb
new file mode 100644
index 0000000000..e4cb19b26a
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/callbacks.rb
@@ -0,0 +1,37 @@
+# frozen_string_literal: true
+
+require "active_support/callbacks"
+
+module ActionCable
+ module Channel
+ module Callbacks
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
+
+ included do
+ define_callbacks :subscribe
+ define_callbacks :unsubscribe
+ end
+
+ module ClassMethods
+ def before_subscribe(*methods, &block)
+ set_callback(:subscribe, :before, *methods, &block)
+ end
+
+ def after_subscribe(*methods, &block)
+ set_callback(:subscribe, :after, *methods, &block)
+ end
+ alias_method :on_subscribe, :after_subscribe
+
+ def before_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :before, *methods, &block)
+ end
+
+ def after_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :after, *methods, &block)
+ end
+ alias_method :on_unsubscribe, :after_unsubscribe
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb
new file mode 100644
index 0000000000..9c324a2a53
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/naming.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Channel
+ module Naming
+ extend ActiveSupport::Concern
+
+ module ClassMethods
+ # Returns the name of the channel, underscored, without the <tt>Channel</tt> ending.
+ # If the channel is in a namespace, then the namespaces are represented by single
+ # colon separators in the channel name.
+ #
+ # ChatChannel.channel_name # => 'chat'
+ # Chats::AppearancesChannel.channel_name # => 'chats:appearances'
+ # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances'
+ def channel_name
+ @channel_name ||= name.sub(/Channel$/, "").gsub("::", ":").underscore
+ end
+ end
+
+ # Delegates to the class' <tt>channel_name</tt>
+ delegate :channel_name, to: :class
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
new file mode 100644
index 0000000000..830b3efa3c
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -0,0 +1,78 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Channel
+ module PeriodicTimers
+ extend ActiveSupport::Concern
+
+ included do
+ class_attribute :periodic_timers, instance_reader: false, default: []
+
+ after_subscribe :start_periodic_timers
+ after_unsubscribe :stop_periodic_timers
+ end
+
+ module ClassMethods
+ # Periodically performs a task on the channel, like updating an online
+ # user counter, polling a backend for new status messages, sending
+ # regular "heartbeat" messages, or doing some internal work and giving
+ # progress updates.
+ #
+ # Pass a method name or lambda argument or provide a block to call.
+ # Specify the calling period in seconds using the <tt>every:</tt>
+ # keyword argument.
+ #
+ # periodically :transmit_progress, every: 5.seconds
+ #
+ # periodically every: 3.minutes do
+ # transmit action: :update_count, count: current_count
+ # end
+ #
+ def periodically(callback_or_method_name = nil, every:, &block)
+ callback =
+ if block_given?
+ raise ArgumentError, "Pass a block or provide a callback arg, not both" if callback_or_method_name
+ block
+ else
+ case callback_or_method_name
+ when Proc
+ callback_or_method_name
+ when Symbol
+ -> { __send__ callback_or_method_name }
+ else
+ raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
+ end
+ end
+
+ unless every.kind_of?(Numeric) && every > 0
+ raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
+ end
+
+ self.periodic_timers += [[ callback, every: every ]]
+ end
+ end
+
+ private
+ def active_periodic_timers
+ @active_periodic_timers ||= []
+ end
+
+ def start_periodic_timers
+ self.class.periodic_timers.each do |callback, options|
+ active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
+ end
+ end
+
+ def start_periodic_timer(callback, every:)
+ connection.server.event_loop.timer every do
+ connection.worker_pool.async_exec self, connection: connection, &callback
+ end
+ end
+
+ def stop_periodic_timers
+ active_periodic_timers.each { |timer| timer.shutdown }
+ active_periodic_timers.clear
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
new file mode 100644
index 0000000000..81c2c38064
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -0,0 +1,176 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module Channel
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
+ # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
+ # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
+ #
+ # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
+ # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
+ # comments on a given page:
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def follow(data)
+ # stream_from "comments_for_#{data['recording_id']}"
+ # end
+ #
+ # def unfollow
+ # stop_all_streams
+ # end
+ # end
+ #
+ # Based on the above example, the subscribers of this channel will get whatever data is put into the,
+ # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there.
+ #
+ # An example broadcasting for this channel looks like so:
+ #
+ # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
+ #
+ # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
+ # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>.
+ #
+ # class CommentsChannel < ApplicationCable::Channel
+ # def subscribed
+ # post = Post.find(params[:id])
+ # stream_for post
+ # end
+ # end
+ #
+ # You can then broadcast to this channel using:
+ #
+ # CommentsChannel.broadcast_to(@post, @comment)
+ #
+ # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out.
+ # The below example shows how you can use this to provide performance introspection in the process:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ #
+ # stream_for @room, coder: ActiveSupport::JSON do |message|
+ # if message['originated_at'].present?
+ # elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
+ #
+ # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
+ # logger.info "Message took #{elapsed_time}s to arrive"
+ # end
+ #
+ # transmit message
+ # end
+ # end
+ # end
+ #
+ # You can stop streaming from all broadcasts by calling #stop_all_streams.
+ module Streams
+ extend ActiveSupport::Concern
+
+ included do
+ on_unsubscribe :stop_all_streams
+ end
+
+ # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
+ # instead of the default of just transmitting the updates straight to the subscriber.
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
+ def stream_from(broadcasting, callback = nil, coder: nil, &block)
+ broadcasting = String(broadcasting)
+
+ # Don't send the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
+ streams << [ broadcasting, handler ]
+
+ connection.server.event_loop.post do
+ pubsub.subscribe(broadcasting, handler, lambda do
+ ensure_confirmation_sent
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end)
+ end
+ end
+
+ # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
+ # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
+ # to the subscriber.
+ #
+ # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback.
+ # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages.
+ def stream_for(model, callback = nil, coder: nil, &block)
+ stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
+ end
+
+ # Unsubscribes all streams associated with this channel from the pubsub queue.
+ def stop_all_streams
+ streams.each do |broadcasting, callback|
+ pubsub.unsubscribe broadcasting, callback
+ logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
+ end.clear
+ end
+
+ private
+ delegate :pubsub, to: :connection
+
+ def streams
+ @_streams ||= []
+ end
+
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to the client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
+ def default_stream_handler(broadcasting, coder:)
+ coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
+
+ -> (message) do
+ transmit handler.(message), via: via
+ end
+ end
+
+ def identity_handler
+ -> message { message }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/channel/test_case.rb b/actioncable/lib/action_cable/channel/test_case.rb
new file mode 100644
index 0000000000..dd2762ccd0
--- /dev/null
+++ b/actioncable/lib/action_cable/channel/test_case.rb
@@ -0,0 +1,271 @@
+# frozen_string_literal: true
+
+require "active_support"
+require "active_support/test_case"
+require "active_support/core_ext/hash/indifferent_access"
+require "json"
+
+module ActionCable
+ module Channel
+ class NonInferrableChannelError < ::StandardError
+ def initialize(name)
+ super "Unable to determine the channel to test from #{name}. " +
+ "You'll need to specify it using `tests YourChannel` in your " +
+ "test case definition."
+ end
+ end
+
+ # Stub `stream_from` to track streams for the channel.
+ # Add public aliases for `subscription_confirmation_sent?` and
+ # `subscription_rejected?`.
+ module ChannelStub
+ def confirmed?
+ subscription_confirmation_sent?
+ end
+
+ def rejected?
+ subscription_rejected?
+ end
+
+ def stream_from(broadcasting, *)
+ streams << broadcasting
+ end
+
+ def stop_all_streams
+ @_streams = []
+ end
+
+ def streams
+ @_streams ||= []
+ end
+
+ # Make periodic timers no-op
+ def start_periodic_timers; end
+ alias stop_periodic_timers start_periodic_timers
+ end
+
+ class ConnectionStub
+ attr_reader :transmissions, :identifiers, :subscriptions, :logger
+
+ def initialize(identifiers = {})
+ @transmissions = []
+
+ identifiers.each do |identifier, val|
+ define_singleton_method(identifier) { val }
+ end
+
+ @subscriptions = ActionCable::Connection::Subscriptions.new(self)
+ @identifiers = identifiers.keys
+ @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
+ end
+
+ def transmit(cable_message)
+ transmissions << cable_message.with_indifferent_access
+ end
+ end
+
+ # Superclass for Action Cable channel functional tests.
+ #
+ # == Basic example
+ #
+ # Functional tests are written as follows:
+ # 1. First, one uses the +subscribe+ method to simulate subscription creation.
+ # 2. Then, one asserts whether the current state is as expected. "State" can be anything:
+ # transmitted messages, subscribed streams, etc.
+ #
+ # For example:
+ #
+ # class ChatChannelTest < ActionCable::Channel::TestCase
+ # def test_subscribed_with_room_number
+ # # Simulate a subscription creation
+ # subscribe room_number: 1
+ #
+ # # Asserts that the subscription was successfully created
+ # assert subscription.confirmed?
+ #
+ # # Asserts that the channel subscribes connection to a stream
+ # assert_equal "chat_1", streams.last
+ # end
+ #
+ # def test_does_not_subscribe_without_room_number
+ # subscribe
+ #
+ # # Asserts that the subscription was rejected
+ # assert subscription.rejected?
+ # end
+ # end
+ #
+ # You can also perform actions:
+ # def test_perform_speak
+ # subscribe room_number: 1
+ #
+ # perform :speak, message: "Hello, Rails!"
+ #
+ # assert_equal "Hello, Rails!", transmissions.last["text"]
+ # end
+ #
+ # == Special methods
+ #
+ # ActionCable::Channel::TestCase will also automatically provide the following instance
+ # methods for use in the tests:
+ #
+ # <b>connection</b>::
+ # An ActionCable::Channel::ConnectionStub, representing the current HTTP connection.
+ # <b>subscription</b>::
+ # An instance of the current channel, created when you call `subscribe`.
+ # <b>transmissions</b>::
+ # A list of all messages that have been transmitted into the channel.
+ # <b>streams</b>::
+ # A list of all created streams subscriptions (as identifiers) for the subscription.
+ #
+ #
+ # == Channel is automatically inferred
+ #
+ # ActionCable::Channel::TestCase will automatically infer the channel under test
+ # from the test class name. If the channel cannot be inferred from the test
+ # class name, you can explicitly set it with +tests+.
+ #
+ # class SpecialEdgeCaseChannelTest < ActionCable::Channel::TestCase
+ # tests SpecialChannel
+ # end
+ #
+ # == Specifying connection identifiers
+ #
+ # You need to set up your connection manually to provide values for the identifiers.
+ # To do this just use:
+ #
+ # stub_connection(user: users[:john])
+ #
+ # == Testing broadcasting
+ #
+ # ActionCable::Channel::TestCase enhances ActionCable::TestHelper assertions (e.g.
+ # +assert_broadcasts+) to handle broadcasting to models:
+ #
+ #
+ # # in your channel
+ # def speak(data)
+ # broadcast_to room, text: data["message"]
+ # end
+ #
+ # def test_speak
+ # subscribe room_id: rooms[:chat].id
+ #
+ # assert_broadcasts_on(rooms[:chat], text: "Hello, Rails!") do
+ # perform :speak, message: "Hello, Rails!"
+ # end
+ # end
+ class TestCase < ActiveSupport::TestCase
+ module Behavior
+ extend ActiveSupport::Concern
+
+ include ActiveSupport::Testing::ConstantLookup
+ include ActionCable::TestHelper
+
+ CHANNEL_IDENTIFIER = "test_stub"
+
+ included do
+ class_attribute :_channel_class
+
+ attr_reader :connection, :subscription
+ delegate :streams, to: :subscription
+
+ ActiveSupport.run_load_hooks(:action_cable_channel_test_case, self)
+ end
+
+ module ClassMethods
+ def tests(channel)
+ case channel
+ when String, Symbol
+ self._channel_class = channel.to_s.camelize.constantize
+ when Module
+ self._channel_class = channel
+ else
+ raise NonInferrableChannelError.new(channel)
+ end
+ end
+
+ def channel_class
+ if channel = self._channel_class
+ channel
+ else
+ tests determine_default_channel(name)
+ end
+ end
+
+ def determine_default_channel(name)
+ channel = determine_constant_from_test_name(name) do |constant|
+ Class === constant && constant < ActionCable::Channel::Base
+ end
+ raise NonInferrableChannelError.new(name) if channel.nil?
+ channel
+ end
+ end
+
+ # Setup test connection with the specified identifiers:
+ #
+ # class ApplicationCable < ActionCable::Connection::Base
+ # identified_by :user, :token
+ # end
+ #
+ # stub_connection(user: users[:john], token: 'my-secret-token')
+ def stub_connection(identifiers = {})
+ @connection = ConnectionStub.new(identifiers)
+ end
+
+ # Subscribe to the channel under test. Optionally pass subscription parameters as a Hash.
+ def subscribe(params = {})
+ @connection ||= stub_connection
+ @subscription = self.class.channel_class.new(connection, CHANNEL_IDENTIFIER, params.with_indifferent_access)
+ @subscription.singleton_class.include(ChannelStub)
+ @subscription.subscribe_to_channel
+ @subscription
+ end
+
+ # Unsubscribe the subscription under test.
+ def unsubscribe
+ check_subscribed!
+ subscription.unsubscribe_from_channel
+ end
+
+ # Perform action on a channel.
+ #
+ # NOTE: Must be subscribed.
+ def perform(action, data = {})
+ check_subscribed!
+ subscription.perform_action(data.stringify_keys.merge("action" => action.to_s))
+ end
+
+ # Returns messages transmitted into channel
+ def transmissions
+ # Return only directly sent message (via #transmit)
+ connection.transmissions.map { |data| data["message"] }.compact
+ end
+
+ # Enhance TestHelper assertions to handle non-String
+ # broadcastings
+ def assert_broadcasts(stream_or_object, *args)
+ super(broadcasting_for(stream_or_object), *args)
+ end
+
+ def assert_broadcast_on(stream_or_object, *args)
+ super(broadcasting_for(stream_or_object), *args)
+ end
+
+ private
+ def check_subscribed!
+ raise "Must be subscribed!" if subscription.nil? || subscription.rejected?
+ end
+
+ def broadcasting_for(stream_or_object)
+ return stream_or_object if stream_or_object.is_a?(String)
+
+ self.class.channel_class.broadcasting_for(
+ [self.class.channel_class.channel_name, stream_or_object]
+ )
+ end
+ end
+
+ include Behavior
+ end
+ end
+end