aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/connection/base.rb
blob: b2d758a1244db69f25cf10e30146fbcc68982666 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
module ActionCable
  module Connection
    class Base
      include Registry

      PING_INTERVAL = 3

      attr_reader :env, :server
      delegate :worker_pool, :pubsub, :logger, to: :server

      def initialize(server, env)
        @server = server
        @env = env
        @accept_messages = false
        @pending_messages = []
      end

      def process
        logger.info "[ActionCable] #{started_request_message}"

        if websocket?
          @subscriptions = {}

          @websocket = Faye::WebSocket.new(@env)

          @websocket.on(:open) do |event|
            broadcast_ping_timestamp
            @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
            worker_pool.async.invoke(self, :initialize_connection)
          end

          @websocket.on(:message) do |event|
            message = event.data

            if message.is_a?(String)
              if @accept_messages
                worker_pool.async.invoke(self, :received_data, message)
              else
                @pending_messages << message
              end
            end
          end

          @websocket.on(:close) do |event|
            logger.info "[ActionCable] #{finished_request_message}"

            worker_pool.async.invoke(self, :on_connection_closed)
            EventMachine.cancel_timer(@ping_timer) if @ping_timer
          end

          @websocket.rack_response
        else
          invalid_request
        end
      end

      def received_data(data)
        return unless websocket_alive?

        data = ActiveSupport::JSON.decode data

        case data['action']
        when 'subscribe'
          subscribe_channel(data)
        when 'unsubscribe'
          unsubscribe_channel(data)
        when 'message'
          process_message(data)
        end
      end

      def cleanup_subscriptions
        @subscriptions.each do |id, channel|
          channel.unsubscribe
        end
      end

      def broadcast(data)
        logger.info "[ActionCable] Sending data: #{data}"
        @websocket.send data
      end

      def handle_exception
        logger.error "[ActionCable] Closing connection"

        @websocket.close
      end

      private
        def initialize_connection
          connect if respond_to?(:connect)
          register_connection

          @accept_messages = true
          worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
        end

        def on_connection_closed
          cleanup_subscriptions
          cleanup_internal_redis_subscriptions
          disconnect if respond_to?(:disconnect)
        end

        def broadcast_ping_timestamp
          broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
        end

        def subscribe_channel(data)
          id_key = data['identifier']
          id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

          subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }

          if subscription_klass
            logger.info "[ActionCable] Subscribing to channel: #{id_key}"
            @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
          else
            logger.error "[ActionCable] Subscription class not found (#{data.inspect})"
          end
        rescue Exception => e
          logger.error "[ActionCable] Could not subscribe to channel (#{data.inspect})"
          log_exception(e)
        end

        def process_message(message)
          if @subscriptions[message['identifier']]
            @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
          else
            logger.error "[ActionCable] Unable to process message because no subscription was found (#{message.inspect})"
          end
        rescue Exception => e
          logger.error "[ActionCable] Could not process message (#{message.inspect})"
          log_exception(e)
        end

        def unsubscribe_channel(data)
          logger.info "[ActionCable] Unsubscribing from channel: #{data['identifier']}"
          @subscriptions[data['identifier']].unsubscribe
          @subscriptions.delete(data['identifier'])
        end

        def invalid_request
          logger.info "[ActionCable] #{finished_request_message}"
          [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
        end

        def websocket_alive?
          @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
        end

        def request
          @request ||= ActionDispatch::Request.new(env)
        end

        def websocket?
          @is_websocket ||= Faye::WebSocket.websocket?(@env)
        end

        def started_request_message
          'Started %s "%s"%s for %s at %s' % [
            request.request_method,
            request.filtered_path,
            websocket? ? ' [Websocket]' : '',
            request.ip,
            Time.now.to_default_s ]
        end

        def finished_request_message
          'Finished "%s"%s for %s at %s' % [
            request.filtered_path,
            websocket? ? ' [Websocket]' : '',
            request.ip,
            Time.now.to_default_s ]
        end

        def log_exception(e)
          logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})"
          logger.error e.backtrace.join("\n")
        end
    end
  end
end