1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
require 'thread'
gem 'redis', '~> 3.0'
require 'redis'
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
# Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
# This is needed, for example, when using Makara proxies for distributed Redis.
cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
def initialize(*)
super
@listener = nil
@redis_connection_for_broadcasts = nil
end
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
listener.remove_subscriber(channel, callback)
end
def shutdown
@listener.shutdown if @listener
end
def redis_connection_for_subscriptions
::Redis.new(@server.config.cable)
end
private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
@redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
end
end
class Listener < SubscriberMap
def initialize(adapter)
super()
@adapter = adapter
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@raw_client = nil
@when_connected = []
@thread = nil
end
def listen(conn)
conn.without_reconnect do
original_client = conn.client
conn.subscribe('_action_cable_internal') do |on|
on.subscribe do |chan, count|
@subscription_lock.synchronize do
if count == 1
@raw_client = original_client
until @when_connected.empty?
@when_connected.shift.call
end
end
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
Concurrent.global_io_executor << next_callback if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
end
on.message do |chan, message|
broadcast(chan, message)
end
on.unsubscribe do |chan, count|
if count == 0
@subscription_lock.synchronize do
@raw_client = nil
end
end
end
end
end
end
def shutdown
@subscription_lock.synchronize do
return if @thread.nil?
when_connected do
send_command('unsubscribe')
@raw_client = nil
end
end
Thread.pass while @thread.alive?
end
def add_channel(channel, on_success)
@subscription_lock.synchronize do
ensure_listener_running
@subscribe_callbacks[channel] << on_success
when_connected { send_command('subscribe', channel) }
end
end
def remove_channel(channel)
@subscription_lock.synchronize do
when_connected { send_command('unsubscribe', channel) }
end
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
end
private
def ensure_listener_running
@thread ||= Thread.new do
Thread.current.abort_on_exception = true
conn = @adapter.redis_connection_for_subscriptions
listen conn
end
end
def when_connected(&block)
if @raw_client
block.call
else
@when_connected << block
end
end
def send_command(*command)
@raw_client.write(command)
very_raw_connection =
@raw_client.connection.instance_variable_defined?(:@connection) &&
@raw_client.connection.instance_variable_get(:@connection)
if very_raw_connection && very_raw_connection.respond_to?(:flush)
very_raw_connection.flush
end
end
end
end
end
end
|