From 7b5225a529cb9693f3bed8e6023de0d5348efca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Thu, 1 Oct 2009 19:00:22 -0300 Subject: Abstract publishing, subscribing and instrumenting in Orchestra. --- activesupport/lib/active_support/orchestra.rb | 74 +++++++++++++++++++++------ 1 file changed, 59 insertions(+), 15 deletions(-) (limited to 'activesupport') diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb index 66097a7bec..f41f55877b 100644 --- a/activesupport/lib/active_support/orchestra.rb +++ b/activesupport/lib/active_support/orchestra.rb @@ -1,4 +1,5 @@ require 'thread' +require 'active_support/core_ext/module/delegation' module ActiveSupport # Orchestra provides an instrumentation API for Ruby. To instrument an action @@ -31,22 +32,65 @@ module ActiveSupport module Orchestra mattr_accessor :queue - @stacked_events = Hash.new { |h,k| h[k] = [] } - - def self.instrument(name, payload=nil) - stack = @stacked_events[Thread.current.object_id] - event = Event.new(name, stack.last, payload) - stack << event - event.result = yield - event - ensure - event.finish! - stack.delete(event) - queue.push(event) + class << self + delegate :instrument, :to => :instrumenter + + def instrumenter + Thread.current[:orchestra_instrumeter] ||= Instrumenter.new(publisher) + end + + def publisher + @publisher ||= Publisher.new(queue) + end + + def subscribe(pattern=nil, &block) + Subscriber.new(queue).bind(pattern).subscribe(&block) + end + end + + class Instrumenter + def initialize(publisher) + @publisher = publisher + @stack = [] + end + + def instrument(name, payload=nil) + event = Event.new(name, @stack.last, payload) + @stack << event + event.result = yield + event + ensure + event.finish! + @stack.pop + @publisher.publish(event) + end end - def self.subscribe(pattern=nil, &block) - queue.subscribe(pattern, &block) + class Publisher + def initialize(queue) + @queue = queue + end + + def publish(event) + @queue.publish(event) + end + end + + class Subscriber + def initialize(queue) + @queue = queue + end + + def bind(pattern) + @pattern = pattern + self + end + + def subscribe + @queue.subscribe(@pattern) do |event| + yield event + end + end end class Event @@ -80,7 +124,7 @@ module ActiveSupport end end - def push(event) + def publish(event) @stream.push(event) @thread.run end -- cgit v1.2.3