From a60bdd7d2921ca10b0a5ae3f750b402e12981004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Wed, 30 Sep 2009 08:59:15 -0300 Subject: Added queue abstraction to Orchestra. --- activesupport/lib/active_support/orchestra.rb | 57 ++++++++++++++++----------- 1 file changed, 33 insertions(+), 24 deletions(-) (limited to 'activesupport/lib/active_support') diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb index efe30669d8..66097a7bec 100644 --- a/activesupport/lib/active_support/orchestra.rb +++ b/activesupport/lib/active_support/orchestra.rb @@ -29,8 +29,9 @@ module ActiveSupport # and is available at ActiveSupport::Orchestra::Listener. # module Orchestra + mattr_accessor :queue + @stacked_events = Hash.new { |h,k| h[k] = [] } - @listeners = [] def self.instrument(name, payload=nil) stack = @stacked_events[Thread.current.object_id] @@ -41,15 +42,11 @@ module ActiveSupport ensure event.finish! stack.delete(event) - @listeners.each { |s| s.push(event) } - end - - def self.register(listener) - @listeners << listener + queue.push(event) end - def self.unregister(listener) - @listeners.delete(listener) + def self.subscribe(pattern=nil, &block) + queue.subscribe(pattern, &block) end class Event @@ -69,35 +66,47 @@ module ActiveSupport end end - class Listener - attr_reader :mutex, :signaler, :thread - + # This is a default queue implementation that ships with Orchestra. It + # consumes events in a thread and publish them to all registered subscribers. + # + class LittleFanout def initialize - @mutex, @signaler = Mutex.new, ConditionVariable.new - @stream = [] + @listeners, @stream = [], [] + @thread = Thread.new do loop do - (event = @stream.shift) ? consume(event) : wait + (event = @stream.shift) ? consume(event) : Thread.stop end end end - def wait - @mutex.synchronize do - @signaler.wait(@mutex) - end + def push(event) + @stream.push(event) + @thread.run end - def push(event) - @mutex.synchronize do - @stream.push(event) - @signaler.broadcast - end + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) end def consume(event) - raise NotImplementedError + @listeners.each { |l| l.publish(event) } + end + + class Listener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end + + def publish(event) + unless @pattern && event.name.to_s !~ @pattern + @subscriber.call(event) + end + end end end end + + Orchestra.queue = Orchestra::LittleFanout.new end -- cgit v1.2.3