aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/connection/base.rb
blob: 1a9aac0731664b2f0f3b3bd3b25fc941f8c0c355 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
module ActionCable
  module Connection
    class Base
      include Identification
      include InternalChannel
      include Authorization

      attr_reader :server, :env
      delegate :worker_pool, :pubsub, to: :server

      attr_reader :logger

      def initialize(server, env)
        @server, @env = server, env

        @logger = new_tagged_logger

        @websocket      = ActionCable::Connection::WebSocket.new(env)
        @heartbeat      = ActionCable::Connection::Heartbeat.new(self)
        @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
        @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

        @started_at = Time.now
      end

      def process
        logger.info started_request_message

        if websocket.possible?
          websocket.on(:open)    { |event| send_async :on_open   }
          websocket.on(:message) { |event| on_message event.data }
          websocket.on(:close)   { |event| send_async :on_close  }
          
          respond_to_successful_request
        else
          respond_to_invalid_request
        end
      end

      def receive(data_in_json)
        if websocket.alive?
          subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
        else
          logger.error "Received data without a live websocket (#{data.inspect})"
        end
      end

      def transmit(data)
        websocket.transmit data
      end

      def close
        logger.error "Closing connection"
        websocket.close
      end


      def send_async(method, *arguments)
        worker_pool.async.invoke(self, method, *arguments)
      end

      def statistics
        { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
      end


      protected
        def request
          @request ||= ActionDispatch::Request.new(Rails.application.env_config.merge(env))
        end

        def cookies
          request.cookie_jar
        end


      private
        attr_reader :websocket
        attr_reader :heartbeat, :subscriptions, :message_buffer

        def on_open
          server.add_connection(self)

          connect if respond_to?(:connect)
          subscribe_to_internal_channel
          heartbeat.start

          message_buffer.process!
        rescue ActionCable::Connection::Authorization::UnauthorizedError
          respond_to_invalid_request
          close
        end

        def on_message(message)
          message_buffer.append message
        end

        def on_close
          logger.info finished_request_message

          server.remove_connection(self)

          subscriptions.cleanup
          unsubscribe_from_internal_channel
          heartbeat.stop

          disconnect if respond_to?(:disconnect)
        end


        def respond_to_successful_request
          websocket.rack_response
        end

        def respond_to_invalid_request
          logger.info finished_request_message
          [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
        end


        # Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
        def new_tagged_logger
          TaggedLoggerProxy.new server.logger, 
            tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
        end

        def started_request_message
          'Started %s "%s"%s for %s at %s' % [
            request.request_method,
            request.filtered_path,
            websocket.possible? ? ' [Websocket]' : '',
            request.ip,
            Time.now.to_default_s ]
        end

        def finished_request_message
          'Finished "%s"%s for %s at %s' % [
            request.filtered_path,
            websocket.possible? ? ' [Websocket]' : '',
            request.ip,
            Time.now.to_default_s ]
        end
    end
  end
end