aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--activesupport/lib/active_support/orchestra.rb74
1 files changed, 59 insertions, 15 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb
index 66097a7bec..f41f55877b 100644
--- a/activesupport/lib/active_support/orchestra.rb
+++ b/activesupport/lib/active_support/orchestra.rb
@@ -1,4 +1,5 @@
require 'thread'
+require 'active_support/core_ext/module/delegation'
module ActiveSupport
# Orchestra provides an instrumentation API for Ruby. To instrument an action
@@ -31,22 +32,65 @@ module ActiveSupport
module Orchestra
mattr_accessor :queue
- @stacked_events = Hash.new { |h,k| h[k] = [] }
-
- def self.instrument(name, payload=nil)
- stack = @stacked_events[Thread.current.object_id]
- event = Event.new(name, stack.last, payload)
- stack << event
- event.result = yield
- event
- ensure
- event.finish!
- stack.delete(event)
- queue.push(event)
+ class << self
+ delegate :instrument, :to => :instrumenter
+
+ def instrumenter
+ Thread.current[:orchestra_instrumeter] ||= Instrumenter.new(publisher)
+ end
+
+ def publisher
+ @publisher ||= Publisher.new(queue)
+ end
+
+ def subscribe(pattern=nil, &block)
+ Subscriber.new(queue).bind(pattern).subscribe(&block)
+ end
+ end
+
+ class Instrumenter
+ def initialize(publisher)
+ @publisher = publisher
+ @stack = []
+ end
+
+ def instrument(name, payload=nil)
+ event = Event.new(name, @stack.last, payload)
+ @stack << event
+ event.result = yield
+ event
+ ensure
+ event.finish!
+ @stack.pop
+ @publisher.publish(event)
+ end
end
- def self.subscribe(pattern=nil, &block)
- queue.subscribe(pattern, &block)
+ class Publisher
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def publish(event)
+ @queue.publish(event)
+ end
+ end
+
+ class Subscriber
+ def initialize(queue)
+ @queue = queue
+ end
+
+ def bind(pattern)
+ @pattern = pattern
+ self
+ end
+
+ def subscribe
+ @queue.subscribe(@pattern) do |event|
+ yield event
+ end
+ end
end
class Event
@@ -80,7 +124,7 @@ module ActiveSupport
end
end
- def push(event)
+ def publish(event)
@stream.push(event)
@thread.run
end