aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/channel/base.rb
blob: ad0d3685cd9b66e04aba40622362f57766f3562d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
# frozen_string_literal: true

require "set"
require "active_support/rescuable"

module ActionCable
  module Channel
    # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
    # You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
    # responding to the subscriber's direct requests.
    #
    # Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
    # lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
    # not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
    # as is normally the case with a controller instance that gets thrown away after every request.
    #
    # Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
    # record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
    #
    # The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
    # can interact with. Here's a quick example:
    #
    #   class ChatChannel < ApplicationCable::Channel
    #     def subscribed
    #       @room = Chat::Room[params[:room_number]]
    #     end
    #
    #     def speak(data)
    #       @room.speak data, user: current_user
    #     end
    #   end
    #
    # The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
    # subscriber wants to say something in the room.
    #
    # == Action processing
    #
    # Unlike subclasses of ActionController::Base, channels do not follow a RESTful
    # constraint form for their actions. Instead, Action Cable operates through a
    # remote-procedure call model. You can declare any public method on the
    # channel (optionally taking a <tt>data</tt> argument), and this method is
    # automatically exposed as callable to the client.
    #
    # Example:
    #
    #   class AppearanceChannel < ApplicationCable::Channel
    #     def subscribed
    #       @connection_token = generate_connection_token
    #     end
    #
    #     def unsubscribed
    #       current_user.disappear @connection_token
    #     end
    #
    #     def appear(data)
    #       current_user.appear @connection_token, on: data['appearing_on']
    #     end
    #
    #     def away
    #       current_user.away @connection_token
    #     end
    #
    #     private
    #       def generate_connection_token
    #         SecureRandom.hex(36)
    #       end
    #   end
    #
    # In this example, the subscribed and unsubscribed methods are not callable methods, as they
    # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt>
    # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not
    # callable, since it's a private method. You'll see that appear accepts a data
    # parameter, which it then uses as part of its model call. <tt>#away</tt>
    # does not, since it's simply a trigger action.
    #
    # Also note that in this example, <tt>current_user</tt> is available because
    # it was marked as an identifying attribute on the connection. All such
    # identifiers will automatically create a delegation method of the same name
    # on the channel instance.
    #
    # == Rejecting subscription requests
    #
    # A channel can reject a subscription request in the #subscribed callback by
    # invoking the #reject method:
    #
    #   class ChatChannel < ApplicationCable::Channel
    #     def subscribed
    #       @room = Chat::Room[params[:room_number]]
    #       reject unless current_user.can_access?(@room)
    #     end
    #   end
    #
    # In this example, the subscription will be rejected if the
    # <tt>current_user</tt> does not have access to the chat room. On the
    # client-side, the <tt>Channel#rejected</tt> callback will get invoked when
    # the server rejects the subscription request.
    class Base
      include Callbacks
      include PeriodicTimers
      include Streams
      include Naming
      include Broadcasting
      include ActiveSupport::Rescuable

      attr_reader :params, :connection, :identifier
      delegate :logger, to: :connection

      class << self
        # A list of method names that should be considered actions. This
        # includes all public instance methods on a channel, less
        # any internal methods (defined on Base), adding back in
        # any methods that are internal, but still exist on the class
        # itself.
        #
        # ==== Returns
        # * <tt>Set</tt> - A set of all methods that should be considered actions.
        def action_methods
          @action_methods ||= begin
            # All public instance methods of this class, including ancestors
            methods = (public_instance_methods(true) -
              # Except for public instance methods of Base and its ancestors
              ActionCable::Channel::Base.public_instance_methods(true) +
              # Be sure to include shadowed public instance methods of this class
              public_instance_methods(false)).uniq.map(&:to_s)
            methods.to_set
          end
        end

        private
          # action_methods are cached and there is sometimes need to refresh
          # them. ::clear_action_methods! allows you to do that, so next time
          # you run action_methods, they will be recalculated.
          def clear_action_methods! # :doc:
            @action_methods = nil
          end

          # Refresh the cached action_methods when a new action_method is added.
          def method_added(name) # :doc:
            super
            clear_action_methods!
          end
      end

      def initialize(connection, identifier, params = {})
        @connection = connection
        @identifier = identifier
        @params     = params

        # When a channel is streaming via pubsub, we want to delay the confirmation
        # transmission until pubsub subscription is confirmed.
        #
        # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
        @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)

        @reject_subscription = nil
        @subscription_confirmation_sent = nil

        delegate_connection_identifiers
      end

      # Extract the action name from the passed data and process it via the channel. The process will ensure
      # that the action requested is a public method on the channel declared by the user (so not one of the callbacks
      # like #subscribed).
      def perform_action(data)
        action = extract_action(data)

        if processable_action?(action)
          payload = { channel_class: self.class.name, action: action, data: data }
          ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
            dispatch_action(action, data)
          end
        else
          logger.error "Unable to process #{action_signature(action, data)}"
        end
      end

      # This method is called after subscription has been added to the connection
      # and confirms or rejects the subscription.
      def subscribe_to_channel
        run_callbacks :subscribe do
          subscribed
        end

        reject_subscription if subscription_rejected?
        ensure_confirmation_sent
      end

      # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
      # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
      def unsubscribe_from_channel # :nodoc:
        run_callbacks :unsubscribe do
          unsubscribed
        end
      end

      private
        # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
        # you want this channel to be sending to the subscriber.
        def subscribed # :doc:
          # Override in subclasses
        end

        # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
        # users as offline or the like.
        def unsubscribed # :doc:
          # Override in subclasses
        end

        # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
        # the proper channel identifier marked as the recipient.
        def transmit(data, via: nil) # :doc:
          status = "#{self.class.name} transmitting #{data.inspect.truncate(300)}"
          status += " (via #{via})" if via
          logger.debug(status)

          payload = { channel_class: self.class.name, data: data, via: via }
          ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
            connection.transmit identifier: @identifier, message: data
          end
        end

        def ensure_confirmation_sent # :doc:
          return if subscription_rejected?
          @defer_subscription_confirmation_counter.decrement
          transmit_subscription_confirmation unless defer_subscription_confirmation?
        end

        def defer_subscription_confirmation! # :doc:
          @defer_subscription_confirmation_counter.increment
        end

        def defer_subscription_confirmation? # :doc:
          @defer_subscription_confirmation_counter.value > 0
        end

        def subscription_confirmation_sent? # :doc:
          @subscription_confirmation_sent
        end

        def reject # :doc:
          @reject_subscription = true
        end

        def subscription_rejected? # :doc:
          @reject_subscription
        end

        def delegate_connection_identifiers
          connection.identifiers.each do |identifier|
            define_singleton_method(identifier) do
              connection.send(identifier)
            end
          end
        end

        def extract_action(data)
          (data["action"].presence || :receive).to_sym
        end

        def processable_action?(action)
          self.class.action_methods.include?(action.to_s) unless subscription_rejected?
        end

        def dispatch_action(action, data)
          logger.info action_signature(action, data)

          if method(action).arity == 1
            public_send action, data
          else
            public_send action
          end
        rescue Exception => exception
          rescue_with_handler(exception) || raise
        end

        def action_signature(action, data)
          (+"#{self.class.name}##{action}").tap do |signature|
            if (arguments = data.except("action")).any?
              signature << "(#{arguments.inspect})"
            end
          end
        end

        def transmit_subscription_confirmation
          unless subscription_confirmation_sent?
            logger.info "#{self.class.name} is transmitting the subscription confirmation"

            ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
              connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
              @subscription_confirmation_sent = true
            end
          end
        end

        def reject_subscription
          connection.subscriptions.remove_subscription self
          transmit_subscription_rejection
        end

        def transmit_subscription_rejection
          logger.info "#{self.class.name} is transmitting the subscription rejection"

          ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
            connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
          end
        end
    end
  end
end