1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
|
gem 'pg', '~> 0.18'
require 'pg'
require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
def initialize(*)
super
@listener = nil
end
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
end
end
def subscribe(channel, callback, success_callback = nil)
listener.add_subscriber(channel, callback, success_callback)
end
def unsubscribe(channel, callback)
listener.remove_subscriber(channel, callback)
end
def shutdown
listener.shutdown
end
def with_connection(&block) # :nodoc:
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
pg_conn = ar_conn.raw_connection
unless pg_conn.is_a?(PG::Connection)
raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
end
yield pg_conn
end
end
private
def listener
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
def initialize(adapter)
super()
@adapter = adapter
@queue = Queue.new
@thread = Thread.new do
Thread.current.abort_on_exception = true
listen
end
end
def listen
@adapter.with_connection do |pg_conn|
catch :shutdown do
loop do
until @queue.empty?
action, channel, callback = @queue.pop(true)
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
Concurrent.global_io_executor << callback if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
throw :shutdown
end
end
pg_conn.wait_for_notify(1) do |chan, pid, message|
broadcast(chan, message)
end
end
end
end
end
def shutdown
@queue.push([:shutdown])
Thread.pass while @thread.alive?
end
def add_channel(channel, on_success)
@queue.push([:listen, channel, on_success])
end
def remove_channel(channel)
@queue.push([:unlisten, channel])
end
def invoke_callback(*)
Concurrent.global_io_executor.post { super }
end
end
end
end
end
|