1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# frozen_string_literal: true
gem "pg", ">= 0.18", "< 2.0"
require "pg"
require "thread"
require "digest/sha1"
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
def initialize(*)
super
@listener = nil
end
def broadcast(channel, payload)
with_broadcast_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{pg_conn.escape_string(payload)}'")
end
end
def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel_identifier(channel), callback, success_callback)
end
def unsubscribe(channel, callback)
listener.remove_subscriber(channel_identifier(channel), callback)
end
def shutdown
listener.shutdown
end
def with_subscriptions_connection(&block) # :nodoc:
ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn|
# Action Cable is taking ownership over this database connection, and
# will perform the necessary cleanup tasks
ActiveRecord::Base.connection_pool.remove(conn)
end
pg_conn = ar_conn.raw_connection
verify!(pg_conn)
yield pg_conn
ensure
ar_conn.disconnect!
end
def with_broadcast_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
verify!(pg_conn)
yield pg_conn
end
end
private
def channel_identifier(channel)
channel.size > 63 ? Digest::SHA1.hexdigest(channel) : channel
end
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def verify!(pg_conn)
unless pg_conn.is_a?(PG::Connection)
raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter"
end
end
class Listener < SubscriberMap
def initialize(adapter, event_loop)
super()
@adapter = adapter
@event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
end
def listen
@adapter.with_subscriptions_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?
action, channel, callback = @queue.pop(true)
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
@event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
throw :shutdown
end
end
pg_conn.wait_for_notify(1) do |chan, pid, message|
broadcast(chan, message)
end
end
end
end
end
def shutdown
@queue.push([:shutdown])
Thread.pass while @thread.alive?
end
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
def remove_channel(channel)
@queue.push([:unlisten, channel])
end
def invoke_callback(*)
@event_loop.post { super }
end
end
end
end
end
|