From bf40bddfceebaff637161be6c5d992d6978679ff Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Mon, 14 Dec 2015 15:48:54 +0100 Subject: Get ready to merge into Rails --- actioncable/lib/action_cable/channel/base.rb | 274 +++++++++++++++++++++ .../lib/action_cable/channel/broadcasting.rb | 29 +++ actioncable/lib/action_cable/channel/callbacks.rb | 35 +++ actioncable/lib/action_cable/channel/naming.rb | 22 ++ .../lib/action_cable/channel/periodic_timers.rb | 41 +++ actioncable/lib/action_cable/channel/streams.rb | 114 +++++++++ 6 files changed, 515 insertions(+) create mode 100644 actioncable/lib/action_cable/channel/base.rb create mode 100644 actioncable/lib/action_cable/channel/broadcasting.rb create mode 100644 actioncable/lib/action_cable/channel/callbacks.rb create mode 100644 actioncable/lib/action_cable/channel/naming.rb create mode 100644 actioncable/lib/action_cable/channel/periodic_timers.rb create mode 100644 actioncable/lib/action_cable/channel/streams.rb (limited to 'actioncable/lib/action_cable/channel') diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb new file mode 100644 index 0000000000..ca903a810d --- /dev/null +++ b/actioncable/lib/action_cable/channel/base.rb @@ -0,0 +1,274 @@ +require 'set' + +module ActionCable + module Channel + # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection. + # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply + # responding to the subscriber's direct requests. + # + # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then + # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care + # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released + # as is normally the case with a controller instance that gets thrown away after every request. + # + # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user + # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it. + # + # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests + # can interact with. Here's a quick example: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # end + # + # def speak(data) + # @room.speak data, user: current_user + # end + # end + # + # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that + # subscriber wants to say something in the room. + # + # == Action processing + # + # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's an remote-procedure call model. You can + # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # + # Example: + # + # class AppearanceChannel < ApplicationCable::Channel + # def subscribed + # @connection_token = generate_connection_token + # end + # + # def unsubscribed + # current_user.disappear @connection_token + # end + # + # def appear(data) + # current_user.appear @connection_token, on: data['appearing_on'] + # end + # + # def away + # current_user.away @connection_token + # end + # + # private + # def generate_connection_token + # SecureRandom.hex(36) + # end + # end + # + # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away + # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then + # uses as part of its model call. #away does not, it's simply a trigger action. + # + # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. + # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # + # == Rejecting subscription requests + # + # A channel can reject a subscription request in the #subscribed callback by invoking #reject! + # + # Example: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # reject unless current_user.can_access?(@room) + # end + # end + # + # In this example, the subscription will be rejected if the current_user does not have access to the chat room. + # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + class Base + include Callbacks + include PeriodicTimers + include Streams + include Naming + include Broadcasting + + attr_reader :params, :connection, :identifier + delegate :logger, to: :connection + + class << self + # A list of method names that should be considered actions. This + # includes all public instance methods on a channel, less + # any internal methods (defined on Base), adding back in + # any methods that are internal, but still exist on the class + # itself. + # + # ==== Returns + # * Set - A set of all methods that should be considered actions. + def action_methods + @action_methods ||= begin + # All public instance methods of this class, including ancestors + methods = (public_instance_methods(true) - + # Except for public instance methods of Base and its ancestors + ActionCable::Channel::Base.public_instance_methods(true) + + # Be sure to include shadowed public instance methods of this class + public_instance_methods(false)).uniq.map(&:to_s) + methods.to_set + end + end + + protected + # action_methods are cached and there is sometimes need to refresh + # them. ::clear_action_methods! allows you to do that, so next time + # you run action_methods, they will be recalculated + def clear_action_methods! + @action_methods = nil + end + + # Refresh the cached action_methods when a new action_method is added. + def method_added(name) + super + clear_action_methods! + end + end + + def initialize(connection, identifier, params = {}) + @connection = connection + @identifier = identifier + @params = params + + # When a channel is streaming via redis pubsub, we want to delay the confirmation + # transmission until redis pubsub subscription is confirmed. + @defer_subscription_confirmation = false + + delegate_connection_identifiers + subscribe_to_channel + end + + # Extract the action name from the passed data and process it via the channel. The process will ensure + # that the action requested is a public method on the channel declared by the user (so not one of the callbacks + # like #subscribed). + def perform_action(data) + action = extract_action(data) + + if processable_action?(action) + dispatch_action(action, data) + else + logger.error "Unable to process #{action_signature(action, data)}" + end + end + + # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. + def unsubscribe_from_channel + run_callbacks :unsubscribe do + unsubscribed + end + end + + + protected + # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams + # you want this channel to be sending to the subscriber. + def subscribed + # Override in subclasses + end + + # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking + # people as offline or the like. + def unsubscribed + # Override in subclasses + end + + # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with + # the proper channel identifier marked as the recipient. + def transmit(data, via: nil) + logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + end + + def defer_subscription_confirmation! + @defer_subscription_confirmation = true + end + + def defer_subscription_confirmation? + @defer_subscription_confirmation + end + + def subscription_confirmation_sent? + @subscription_confirmation_sent + end + + def reject + @reject_subscription = true + end + + def subscription_rejected? + @reject_subscription + end + + private + def delegate_connection_identifiers + connection.identifiers.each do |identifier| + define_singleton_method(identifier) do + connection.send(identifier) + end + end + end + + + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + if subscription_rejected? + reject_subscription + else + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + end + + + def extract_action(data) + (data['action'].presence || :receive).to_sym + end + + def processable_action?(action) + self.class.action_methods.include?(action.to_s) + end + + def dispatch_action(action, data) + logger.info action_signature(action, data) + + if method(action).arity == 1 + public_send action, data + else + public_send action + end + end + + def action_signature(action, data) + "#{self.class.name}##{action}".tap do |signature| + if (arguments = data.except('action')).any? + signature << "(#{arguments.inspect})" + end + end + end + + def transmit_subscription_confirmation + unless subscription_confirmation_sent? + logger.info "#{self.class.name} is transmitting the subscription confirmation" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + @subscription_confirmation_sent = true + end + end + + def reject_subscription + connection.subscriptions.remove_subscription self + transmit_subscription_rejection + end + + def transmit_subscription_rejection + logger.info "#{self.class.name} is transmitting the subscription rejection" + connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + end + end + end +end diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb new file mode 100644 index 0000000000..afc23d7d1a --- /dev/null +++ b/actioncable/lib/action_cable/channel/broadcasting.rb @@ -0,0 +1,29 @@ +require 'active_support/core_ext/object/to_param' + +module ActionCable + module Channel + module Broadcasting + extend ActiveSupport::Concern + + delegate :broadcasting_for, to: :class + + class_methods do + # Broadcast a hash to a unique broadcasting for this model in this channel. + def broadcast_to(model, message) + ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message) + end + + def broadcasting_for(model) #:nodoc: + case + when model.is_a?(Array) + model.map { |m| broadcasting_for(m) }.join(':') + when model.respond_to?(:to_gid_param) + model.to_gid_param + else + model.to_param + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/channel/callbacks.rb b/actioncable/lib/action_cable/channel/callbacks.rb new file mode 100644 index 0000000000..295d750e86 --- /dev/null +++ b/actioncable/lib/action_cable/channel/callbacks.rb @@ -0,0 +1,35 @@ +require 'active_support/callbacks' + +module ActionCable + module Channel + module Callbacks + extend ActiveSupport::Concern + include ActiveSupport::Callbacks + + included do + define_callbacks :subscribe + define_callbacks :unsubscribe + end + + class_methods do + def before_subscribe(*methods, &block) + set_callback(:subscribe, :before, *methods, &block) + end + + def after_subscribe(*methods, &block) + set_callback(:subscribe, :after, *methods, &block) + end + alias_method :on_subscribe, :after_subscribe + + def before_unsubscribe(*methods, &block) + set_callback(:unsubscribe, :before, *methods, &block) + end + + def after_unsubscribe(*methods, &block) + set_callback(:unsubscribe, :after, *methods, &block) + end + alias_method :on_unsubscribe, :after_unsubscribe + end + end + end +end diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb new file mode 100644 index 0000000000..4c9d53b15a --- /dev/null +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -0,0 +1,22 @@ +module ActionCable + module Channel + module Naming + extend ActiveSupport::Concern + + class_methods do + # Returns the name of the channel, underscored, without the Channel ending. + # If the channel is in a namespace, then the namespaces are represented by single + # colon separators in the channel name. + # + # ChatChannel.channel_name # => 'chat' + # Chats::AppearancesChannel.channel_name # => 'chats:appearances' + def channel_name + @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore + end + end + + # Delegates to the class' channel_name + delegate :channel_name, to: :class + end + end +end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb new file mode 100644 index 0000000000..25fe8e5e54 --- /dev/null +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -0,0 +1,41 @@ +module ActionCable + module Channel + module PeriodicTimers + extend ActiveSupport::Concern + + included do + class_attribute :periodic_timers, instance_reader: false + self.periodic_timers = [] + + after_subscribe :start_periodic_timers + after_unsubscribe :stop_periodic_timers + end + + module ClassMethods + # Allow you to call a private method every so often seconds. This periodic timer can be useful + # for sending a steady flow of updates to a client based off an object that was configured on subscription. + # It's an alternative to using streams if the channel is able to do the work internally. + def periodically(callback, every:) + self.periodic_timers += [ [ callback, every: every ] ] + end + end + + private + def active_periodic_timers + @active_periodic_timers ||= [] + end + + def start_periodic_timers + self.class.periodic_timers.each do |callback, options| + active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + connection.worker_pool.async.run_periodic_timer(self, callback) + end + end + end + + def stop_periodic_timers + active_periodic_timers.each { |timer| timer.cancel } + end + end + end +end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb new file mode 100644 index 0000000000..b5ffa17f72 --- /dev/null +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -0,0 +1,114 @@ +module ActionCable + module Channel + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data + # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # + # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between + # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new + # comments on a given page: + # + # class CommentsChannel < ApplicationCable::Channel + # def follow(data) + # stream_from "comments_for_#{data['recording_id']}" + # end + # + # def unfollow + # stop_all_streams + # end + # end + # + # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. + # That looks like so from that side of things: + # + # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' + # + # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. + # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE` + # + # class CommentsChannel < ApplicationCable::Channel + # def subscribed + # post = Post.find(params[:id]) + # stream_for post + # end + # end + # + # You can then broadcast to this channel using: + # + # CommentsChannel.broadcast_to(@post, @comment) + # + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. + # Example below shows how you can use this to provide performance introspection in the process: + # + # class ChatChannel < ApplicationCable::Channel + # def subscribed + # @room = Chat::Room[params[:room_number]] + # + # stream_for @room, -> (encoded_message) do + # message = ActiveSupport::JSON.decode(encoded_message) + # + # if message['originated_at'].present? + # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) + # + # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing + # logger.info "Message took #{elapsed_time}s to arrive" + # end + # + # transmit message + # end + # end + # + # You can stop streaming from all broadcasts by calling #stop_all_streams. + module Streams + extend ActiveSupport::Concern + + included do + on_unsubscribe :stop_all_streams + end + + # Start streaming from the named broadcasting pubsub queue. Optionally, you can pass a callback that'll be used + # instead of the default of just transmitting the updates straight to the subscriber. + def stream_from(broadcasting, callback = nil) + # Hold off the confirmation until pubsub#subscribe is successful + defer_subscription_confirmation! + + callback ||= default_stream_callback(broadcasting) + streams << [ broadcasting, callback ] + + EM.next_tick do + pubsub.subscribe(broadcasting, &callback).callback do |reply| + transmit_subscription_confirmation + logger.info "#{self.class.name} is streaming from #{broadcasting}" + end + end + end + + # Start streaming the pubsub queue for the model in this channel. Optionally, you can pass a + # callback that'll be used instead of the default of just transmitting the updates straight + # to the subscriber. + def stream_for(model, callback = nil) + stream_from(broadcasting_for([ channel_name, model ]), callback) + end + + def stop_all_streams + streams.each do |broadcasting, callback| + pubsub.unsubscribe_proc broadcasting, callback + logger.info "#{self.class.name} stopped streaming from #{broadcasting}" + end.clear + end + + private + delegate :pubsub, to: :connection + + def streams + @_streams ||= [] + end + + def default_stream_callback(broadcasting) + -> (message) do + transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + end + end + end + end +end -- cgit v1.2.3