1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
require 'mutex_m'
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
# This class is thread safe. All methods are reentrant.
class Fanout
include Mutex_m
def initialize
@subscribers = []
@listeners_for = {}
super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
synchronize do
@subscribers << subscriber
@listeners_for.clear
end
subscriber
end
def unsubscribe(subscriber)
synchronize do
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
end
def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end
def finish(name, id, payload)
listeners_for(name).each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
synchronize do
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
end
def listening?(name)
listeners_for(name).any?
end
# This is a sync queue, so there is no waiting.
def wait
end
module Subscribers # :nodoc:
def self.new(pattern, listener)
if listener.respond_to?(:start) and listener.respond_to?(:finish)
subscriber = Evented.new pattern, listener
else
subscriber = Timed.new pattern, listener
end
unless pattern
AllMessages.new(subscriber)
else
subscriber
end
end
class Evented #:nodoc:
def initialize(pattern, delegate)
@pattern = pattern
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def subscribed_to?(name)
@pattern === name.to_s
end
def matches?(subscriber_or_name)
self === subscriber_or_name ||
@pattern && @pattern === subscriber_or_name
end
end
class Timed < Evented
def initialize(pattern, delegate)
@timestack = []
super
end
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
@timestack.push Time.now
end
def finish(name, id, payload)
started = @timestack.pop
@delegate.call(name, started, Time.now, id, payload)
end
end
class AllMessages # :nodoc:
def initialize(delegate)
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def publish(name, *args)
@delegate.publish name, *args
end
def subscribed_to?(name)
true
end
alias :matches? :===
end
end
end
end
end
|