aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/server.rb
blob: ebf98171c13b3489ee54404f2ff69362d62d5177 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
module ActionCable
  class Server
    class_attribute :registered_channels
    self.registered_channels = Set.new

    class_attribute :worker_pool_size
    self.worker_pool_size = 100

    cattr_accessor(:logger, instance_reader: true) { Rails.logger }

    PING_INTERVAL = 3

    class << self
      def register_channels(*channel_classes)
        self.registered_channels += channel_classes
      end

      def call(env)
        new(env).process
      end

      def worker_pool
        @worker_pool ||= ActionCable::Worker.pool(size: worker_pool_size)
      end
    end

    attr_reader :env

    def initialize(env)
      @env = env
      @accept_messages = false
      @pending_messages = []
    end

    def process
      if Faye::WebSocket.websocket?(@env)
        @subscriptions = {}

        @websocket = Faye::WebSocket.new(@env)

        @websocket.on(:open) do |event|
          broadcast_ping_timestamp
          @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp }
          worker_pool.async.invoke(self, :initialize_client)
        end

        @websocket.on(:message) do |event|
          message = event.data

          if message.is_a?(String)
            if @accept_messages
              worker_pool.async.invoke(self, :received_data, message)
            else
              @pending_messages << message
            end
          end
        end

        @websocket.on(:close) do |event|
          worker_pool.async.invoke(self, :cleanup_subscriptions)
          worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect)

          EventMachine.cancel_timer(@ping_timer) if @ping_timer
        end

        @websocket.rack_response
      else
        invalid_request
      end
    end

    def received_data(data)
      return unless websocket_alive?

      data = ActiveSupport::JSON.decode data

      case data['action']
      when 'subscribe'
        subscribe_channel(data)
      when 'unsubscribe'
        unsubscribe_channel(data)
      when 'message'
        process_message(data)
      end
    end

    def cleanup_subscriptions
      @subscriptions.each do |id, channel|
        channel.unsubscribe
      end
    end

    def broadcast(data)
      logger.info "Sending data: #{data}"
      @websocket.send data
    end

    def worker_pool
      self.class.worker_pool
    end

    def handle_exception
      logger.error "[ActionCable] Closing connection"

      @websocket.close
    end

    private
      def initialize_client
        connect if respond_to?(:connect)
        @accept_messages = true

        worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
      end

      def broadcast_ping_timestamp
        broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json)
      end

      def subscribe_channel(data)
        id_key = data['identifier']
        id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

        subscription_klass = registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }

        if subscription_klass
          logger.info "Subscribing to channel: #{id_key}"
          @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
        else
          logger.error "Unable to subscribe to channel: #{id_key}"
        end
      end

      def process_message(message)
        if @subscriptions[message['identifier']]
          @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data'])
        else
          logger.error "Unable to process message: #{message}"
        end
      end

      def unsubscribe_channel(data)
        logger.info "Unsubscribing from channel: #{data['identifier']}"
        @subscriptions[data['identifier']].unsubscribe
        @subscriptions.delete(data['identifier'])
      end

      def invalid_request
        [404, {'Content-Type' => 'text/plain'}, ['Page not found']]
      end

      def websocket_alive?
        @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
      end

  end
end