aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/stream_event_loop.rb
blob: 2d1af0ff9f02e5d50fd5e6030161efe29fcaa42c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
require "nio"
require "thread"

module ActionCable
  module Connection
    class StreamEventLoop
      def initialize
        @nio = @executor = @thread = nil
        @map = {}
        @stopping = false
        @todo = Queue.new

        @spawn_mutex = Mutex.new
      end

      def timer(interval, &block)
        Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
      end

      def post(task = nil, &block)
        task ||= block

        spawn
        @executor << task
      end

      def attach(io, stream)
        @todo << lambda do
          @map[io] = @nio.register(io, :r)
          @map[io].value = stream
        end
        wakeup
      end

      def detach(io, stream)
        @todo << lambda do
          @nio.deregister io
          @map.delete io
          io.close
        end
        wakeup
      end

      def writes_pending(io)
        @todo << lambda do
          if monitor = @map[io]
            monitor.interests = :rw
          end
        end
        wakeup
      end

      def stop
        @stopping = true
        wakeup if @nio
      end

      private
        def spawn
          return if @thread && @thread.status

          @spawn_mutex.synchronize do
            return if @thread && @thread.status

            @nio ||= NIO::Selector.new

            @executor ||= Concurrent::ThreadPoolExecutor.new(
              min_threads: 1,
              max_threads: 10,
              max_queue: 0,
            )

            @thread = Thread.new { run }

            return true
          end
        end

        def wakeup
          spawn || @nio.wakeup
        end

        def run
          loop do
            if @stopping
              @nio.close
              break
            end

            until @todo.empty?
              @todo.pop(true).call
            end

            next unless monitors = @nio.select

            monitors.each do |monitor|
              io = monitor.io
              stream = monitor.value

              begin
                if monitor.writable?
                  if stream.flush_write_buffer
                    monitor.interests = :r
                  end
                  next unless monitor.readable?
                end

                incoming = io.read_nonblock(4096, exception: false)
                case incoming
                when :wait_readable
                  next
                when nil
                  stream.close
                else
                  stream.receive incoming
                end
              rescue
                # We expect one of EOFError or Errno::ECONNRESET in
                # normal operation (when the client goes away). But if
                # anything else goes wrong, this is still the best way
                # to handle it.
                begin
                  stream.close
                rescue
                  @nio.deregister io
                  @map.delete io
                end
              end
            end
          end
        end
    end
  end
end