1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
require 'thread'
module ActiveSupport
# Orchestra provides an instrumentation API for Ruby. To instrument an action
# in Ruby you just need to:
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
# Those actions are consumed by listeners. A listener is anything that responds
# to push. You can even register an array:
#
# @listener = []
# ActiveSupport::Orchestra.register @listener
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
# event #=> ActiveSupport::Orchestra::Event
# event.name #=> :render
# event.duration #=> 10 (in miliseconds)
# event.result #=> "Foo"
# event.payload #=> { :extra => :information }
#
# Orchestra ships with a default listener implementation which puts events in
# a stream and consume them in a Thread. This implementation is thread safe
# and is available at ActiveSupport::Orchestra::Listener.
#
module Orchestra
@stacked_events = Hash.new { |h,k| h[k] = [] }
@listeners = []
def self.instrument(name, payload=nil)
stack = @stacked_events[Thread.current.object_id]
event = Event.new(name, stack.last, payload)
stack << event
event.result = yield
event
ensure
event.finish!
stack.delete(event)
@listeners.each { |s| s.push(event) }
end
def self.register(listener)
@listeners << listener
end
def self.unregister(listener)
@listeners.delete(listener)
end
class Event
attr_reader :name, :time, :duration, :parent, :thread_id, :payload
attr_accessor :result
def initialize(name, parent=nil, payload=nil)
@name = name
@time = Time.now
@thread_id = Thread.current.object_id
@parent = parent
@payload = payload
end
def finish!
@duration = 1000 * (Time.now.to_f - @time.to_f)
end
end
class Listener
attr_reader :mutex, :signaler, :thread
def initialize
@mutex, @signaler = Mutex.new, ConditionVariable.new
@stream = []
@thread = Thread.new do
loop do
(event = @stream.shift) ? consume(event) : wait
end
end
end
def wait
@mutex.synchronize do
@signaler.wait(@mutex)
end
end
def push(event)
@mutex.synchronize do
@stream.push(event)
@signaler.broadcast
end
end
def consume(event)
raise NotImplementedError
end
end
end
end
|