diff options
Diffstat (limited to 'actioncable/lib/action_cable/channel/base.rb')
-rw-r--r-- | actioncable/lib/action_cable/channel/base.rb | 277 |
1 files changed, 277 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb new file mode 100644 index 0000000000..ce9d62635c --- /dev/null +++ b/actioncable/lib/action_cable/channel/base.rb @@ -0,0 +1,277 @@ +require 'set' + +module ActionCable + module Channel + # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection. + # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply + # responding to the subscriber's direct requests. + # + # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then + # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care + # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released + # as is normally the case with a controller instance that gets thrown away after every request. + # + # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user + # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it. + # + # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests + # can interact with. Here's a quick example: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # end + # + # def speak(data) + # @room.speak data, user: current_user + # end + # end + # + # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that + # subscriber wants to say something in the room. + # + # == Action processing + # + # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can + # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # + # Example: + # + # class AppearanceChannel < ApplicationCable::Channel + # def subscribed + # @connection_token = generate_connection_token + # end + # + # def unsubscribed + # current_user.disappear @connection_token + # end + # + # def appear(data) + # current_user.appear @connection_token, on: data['appearing_on'] + # end + # + # def away + # current_user.away @connection_token + # end + # + # private + # def generate_connection_token + # SecureRandom.hex(36) + # end + # end + # + # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away + # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then + # uses as part of its model call. #away does not, it's simply a trigger action. + # + # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. + # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # + # == Rejecting subscription requests + # + # A channel can reject a subscription request in the #subscribed callback by invoking #reject! + # + # Example: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # reject unless current_user.can_access?(@room) + # end + # end + # + # In this example, the subscription will be rejected if the current_user does not have access to the chat room. + # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + class Base + include Callbacks + include PeriodicTimers + include Streams + include Naming + include Broadcasting + + attr_reader :params, :connection, :identifier + delegate :logger, to: :connection + + class << self + # A list of method names that should be considered actions. This + # includes all public instance methods on a channel, less + # any internal methods (defined on Base), adding back in + # any methods that are internal, but still exist on the class + # itself. + # + # ==== Returns + # * <tt>Set</tt> - A set of all methods that should be considered actions. + def action_methods + @action_methods ||= begin + # All public instance methods of this class, including ancestors + methods = (public_instance_methods(true) - + # Except for public instance methods of Base and its ancestors + ActionCable::Channel::Base.public_instance_methods(true) + + # Be sure to include shadowed public instance methods of this class + public_instance_methods(false)).uniq.map(&:to_s) + methods.to_set + end + end + + protected + # action_methods are cached and there is sometimes need to refresh + # them. ::clear_action_methods! allows you to do that, so next time + # you run action_methods, they will be recalculated + def clear_action_methods! + @action_methods = nil + end + + # Refresh the cached action_methods when a new action_method is added. + def method_added(name) + super + clear_action_methods! + end + end + + def initialize(connection, identifier, params = {}) + @connection = connection + @identifier = identifier + @params = params + + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + + @reject_subscription = nil + @subscription_confirmation_sent = nil + + delegate_connection_identifiers + subscribe_to_channel + end + + # Extract the action name from the passed data and process it via the channel. The process will ensure + # that the action requested is a public method on the channel declared by the user (so not one of the callbacks + # like #subscribed). + def perform_action(data) + action = extract_action(data) + + if processable_action?(action) + dispatch_action(action, data) + else + logger.error "Unable to process #{action_signature(action, data)}" + end + end + + # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. + def unsubscribe_from_channel + run_callbacks :unsubscribe do + unsubscribed + end + end + + + protected + # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams + # you want this channel to be sending to the subscriber. + def subscribed + # Override in subclasses + end + + # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking + # people as offline or the like. + def unsubscribed + # Override in subclasses + end + + # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with + # the proper channel identifier marked as the recipient. + def transmit(data, via: nil) + logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + end + + def defer_subscription_confirmation! + @defer_subscription_confirmation = true + end + + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + + def subscription_confirmation_sent? + @subscription_confirmation_sent + end + + def reject + @reject_subscription = true + end + + def subscription_rejected? + @reject_subscription + end + + private + def delegate_connection_identifiers + connection.identifiers.each do |identifier| + define_singleton_method(identifier) do + connection.send(identifier) + end + end + end + + + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + if subscription_rejected? + reject_subscription + else + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + end + + + def extract_action(data) + (data['action'].presence || :receive).to_sym + end + + def processable_action?(action) + self.class.action_methods.include?(action.to_s) + end + + def dispatch_action(action, data) + logger.info action_signature(action, data) + + if method(action).arity == 1 + public_send action, data + else + public_send action + end + end + + def action_signature(action, data) + "#{self.class.name}##{action}".tap do |signature| + if (arguments = data.except('action')).any? + signature << "(#{arguments.inspect})" + end + end + end + + def transmit_subscription_confirmation + unless subscription_confirmation_sent? + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + @subscription_confirmation_sent = true + end + end + + def reject_subscription + connection.subscriptions.remove_subscription self + transmit_subscription_rejection + end + + def transmit_subscription_rejection + logger.info "#{self.class.name} is transmitting the subscription rejection" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + end + end + end +end |