aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/connection.rb
blob: 00fb8ca817cef62802b74a69d283ee3c25195c73 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
module ActionCable
  class Connection
    PING_INTERVAL = 3

    attr_reader :env, :server
    delegate :worker_pool, :pubsub, :logger, to: :server

    def initialize(server, env)
      @server = server
      @env = env
      @accept_messages = false
      @pending_messages = []
    end

    def process
      if Faye::WebSocket.websocket?(@env)
        @subscriptions = {}

        @websocket = Faye::WebSocket.new(@env)

        @websocket.on(:open) do |event|
          broadcast_ping_timestamp
          @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
          worker_pool.async.invoke(self, :initialize_client)
        end

        @websocket.on(:message) do |event|
          message = event.data

          if message.is_a?(String)
            if @accept_messages
              worker_pool.async.invoke(self, :received_data, message)
            else
              @pending_messages << message
            end
          end
        end

        @websocket.on(:close) do |event|
          worker_pool.async.invoke(self, :cleanup_subscriptions)
          worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)

          EventMachine.cancel_timer(@ping_timer) if @ping_timer
        end

        @websocket.rack_response
      else
        invalid_request
      end
    end

    def received_data(data)
      return unless websocket_alive?

      data = ActiveSupport::JSON.decode data

      case data['action']
      when 'subscribe'
        subscribe_channel(data)
      when 'unsubscribe'
        unsubscribe_channel(data)
      when 'message'
        process_message(data)
      end
    end

    def cleanup_subscriptions
      @subscriptions.each do |id, channel|
        channel.unsubscribe
      end
    end

    def broadcast(data)
      logger.info "Sending data: #{data}"
      @websocket.send data
    end

    def handle_exception
      logger.error "[ActionCable] Closing connection"

      @websocket.close
    end

    private
      def initialize_client
        connect if respond_to?(:connect)
        @accept_messages = true

        worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
      end

      def broadcast_ping_timestamp
        broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
      end

      def subscribe_channel(data)
        id_key = data['identifier']
        id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

        subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }

        if subscription_klass
          logger.info "Subscribing to channel: #{id_key}"
          @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
        else
          logger.error "Unable to subscribe to channel: #{id_key}"
        end
      end

      def process_message(message)
        if @subscriptions[message['identifier']]
          @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
        else
          logger.error "Unable to process message: #{message}"
        end
      end

      def unsubscribe_channel(data)
        logger.info "Unsubscribing from channel: #{data['identifier']}"
        @subscriptions[data['identifier']].unsubscribe
        @subscriptions.delete(data['identifier'])
      end

      def invalid_request
        [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
      end

      def websocket_alive?
        @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
      end

  end
end