diff options
author | José Valim <jose.valim@gmail.com> | 2009-09-30 08:59:15 -0300 |
---|---|---|
committer | José Valim <jose.valim@gmail.com> | 2009-10-15 18:18:43 -0300 |
commit | a60bdd7d2921ca10b0a5ae3f750b402e12981004 (patch) | |
tree | 4c9f577be07200eebc38af96a3b12c2360d72f9a /activesupport/lib | |
parent | 8b340ab2f62bac2af9d5917e296bb4101530282a (diff) | |
download | rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.gz rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.bz2 rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.zip |
Added queue abstraction to Orchestra.
Diffstat (limited to 'activesupport/lib')
-rw-r--r-- | activesupport/lib/active_support/orchestra.rb | 57 |
1 files changed, 33 insertions, 24 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb index efe30669d8..66097a7bec 100644 --- a/activesupport/lib/active_support/orchestra.rb +++ b/activesupport/lib/active_support/orchestra.rb @@ -29,8 +29,9 @@ module ActiveSupport # and is available at ActiveSupport::Orchestra::Listener. # module Orchestra + mattr_accessor :queue + @stacked_events = Hash.new { |h,k| h[k] = [] } - @listeners = [] def self.instrument(name, payload=nil) stack = @stacked_events[Thread.current.object_id] @@ -41,15 +42,11 @@ module ActiveSupport ensure event.finish! stack.delete(event) - @listeners.each { |s| s.push(event) } - end - - def self.register(listener) - @listeners << listener + queue.push(event) end - def self.unregister(listener) - @listeners.delete(listener) + def self.subscribe(pattern=nil, &block) + queue.subscribe(pattern, &block) end class Event @@ -69,35 +66,47 @@ module ActiveSupport end end - class Listener - attr_reader :mutex, :signaler, :thread - + # This is a default queue implementation that ships with Orchestra. It + # consumes events in a thread and publish them to all registered subscribers. + # + class LittleFanout def initialize - @mutex, @signaler = Mutex.new, ConditionVariable.new - @stream = [] + @listeners, @stream = [], [] + @thread = Thread.new do loop do - (event = @stream.shift) ? consume(event) : wait + (event = @stream.shift) ? consume(event) : Thread.stop end end end - def wait - @mutex.synchronize do - @signaler.wait(@mutex) - end + def push(event) + @stream.push(event) + @thread.run end - def push(event) - @mutex.synchronize do - @stream.push(event) - @signaler.broadcast - end + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) end def consume(event) - raise NotImplementedError + @listeners.each { |l| l.publish(event) } + end + + class Listener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end + + def publish(event) + unless @pattern && event.name.to_s !~ @pattern + @subscriber.call(event) + end + end end end end + + Orchestra.queue = Orchestra::LittleFanout.new end |