aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/redis.rb
blob: ba4934a264aea2a8b9545ee87f64171a15360430 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
require 'thread'

gem 'redis', '~> 3.0'
require 'redis'

module ActionCable
  module SubscriptionAdapter
    class Redis < Base # :nodoc:
      # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
      # This is needed, for example, when using Makara proxies for distributed Redis.
      cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }

      def initialize(*)
        super
        @listener = nil
        @redis_connection_for_broadcasts = nil
      end

      def broadcast(channel, payload)
        redis_connection_for_broadcasts.publish(channel, payload)
      end

      def subscribe(channel, callback, success_callback = nil)
        listener.add_subscriber(channel, callback, success_callback)
      end

      def unsubscribe(channel, callback)
        listener.remove_subscriber(channel, callback)
      end

      def shutdown
        @listener.shutdown if @listener
      end

      def redis_connection_for_subscriptions
        ::Redis.new(@server.config.cable)
      end

      private
        def listener
          @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
        end

        def redis_connection_for_broadcasts
          @redis_connection_for_broadcasts || @server.mutex.synchronize do
            @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
          end
        end

        class Listener < SubscriberMap
          def initialize(adapter)
            super()

            @adapter = adapter

            @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
            @subscription_lock = Mutex.new

            @raw_client = nil

            @when_connected = []

            @thread = nil
          end

          def listen(conn)
            conn.without_reconnect do
              original_client = conn.client

              conn.subscribe('_action_cable_internal') do |on|
                on.subscribe do |chan, count|
                  @subscription_lock.synchronize do
                    if count == 1
                      @raw_client = original_client

                      until @when_connected.empty?
                        @when_connected.shift.call
                      end
                    end

                    if callbacks = @subscribe_callbacks[chan]
                      next_callback = callbacks.shift
                      Concurrent.global_io_executor << next_callback if next_callback
                      @subscribe_callbacks.delete(chan) if callbacks.empty?
                    end
                  end
                end

                on.message do |chan, message|
                  broadcast(chan, message)
                end

                on.unsubscribe do |chan, count|
                  if count == 0
                    @subscription_lock.synchronize do
                      @raw_client = nil
                    end
                  end
                end
              end
            end
          end

          def shutdown
            @subscription_lock.synchronize do
              return if @thread.nil?

              when_connected do
                send_command('unsubscribe')
                @raw_client = nil
              end
            end

            Thread.pass while @thread.alive?
          end

          def add_channel(channel, on_success)
            @subscription_lock.synchronize do
              ensure_listener_running
              @subscribe_callbacks[channel] << on_success
              when_connected { send_command('subscribe', channel) }
            end
          end

          def remove_channel(channel)
            @subscription_lock.synchronize do
              when_connected { send_command('unsubscribe', channel) }
            end
          end

          def invoke_callback(*)
            Concurrent.global_io_executor.post { super }
          end

          private
            def ensure_listener_running
              @thread ||= Thread.new do
                Thread.current.abort_on_exception = true

                conn = @adapter.redis_connection_for_subscriptions
                listen conn
              end
            end

            def when_connected(&block)
              if @raw_client
                block.call
              else
                @when_connected << block
              end
            end

            def send_command(*command)
              @raw_client.write(command)

              very_raw_connection =
                @raw_client.connection.instance_variable_defined?(:@connection) &&
                @raw_client.connection.instance_variable_get(:@connection)

              if very_raw_connection && very_raw_connection.respond_to?(:flush)
                very_raw_connection.flush
              end
            end
        end
    end
  end
end