1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
class Fanout
def initialize
@subscribers = []
@listeners_for = {}
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
@subscribers << subscriber
@listeners_for.clear
subscriber
end
def unsubscribe(subscriber)
@subscribers.reject! { |s| s.matches?(subscriber) }
@listeners_for.clear
end
def start(name, id, payload)
listeners_for(name).each { |s| s.start(name, id, payload) }
end
def finish(name, id, payload)
listeners_for(name).each { |s| s.finish(name, id, payload) }
end
def publish(name, *args)
listeners_for(name).each { |s| s.publish(name, *args) }
end
def listeners_for(name)
@listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
end
def listening?(name)
listeners_for(name).any?
end
# This is a sync queue, so there is no waiting.
def wait
end
module Subscribers # :nodoc:
def self.new(pattern, listener)
if listener.respond_to?(:call)
subscriber = Timed.new pattern, listener
else
subscriber = Evented.new pattern, listener
end
unless pattern
AllMessages.new(subscriber)
else
subscriber
end
end
class Evented #:nodoc:
def initialize(pattern, delegate)
@pattern = pattern
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def subscribed_to?(name)
@pattern === name.to_s
end
def matches?(subscriber_or_name)
self === subscriber_or_name ||
@pattern && @pattern === subscriber_or_name
end
end
class Timed < Evented
def initialize(pattern, delegate)
@timestack = Hash.new { |h,id|
h[id] = Hash.new { |ids,name| ids[name] = [] }
}
super
end
def publish(name, *args)
@delegate.call name, *args
end
def start(name, id, payload)
@timestack[id][name].push Time.now
end
def finish(name, id, payload)
started = @timestack[id][name].pop
@delegate.call(name, started, Time.now, id, payload)
end
end
class AllMessages # :nodoc:
def initialize(delegate)
@delegate = delegate
end
def start(name, id, payload)
@delegate.start name, id, payload
end
def finish(name, id, payload)
@delegate.finish name, id, payload
end
def publish(name, *args)
@delegate.publish name, *args
end
def subscribed_to?(name)
true
end
alias :matches? :===
end
end
end
end
end
|