aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@gmail.com>2009-09-30 08:59:15 -0300
committerJosé Valim <jose.valim@gmail.com>2009-10-15 18:18:43 -0300
commita60bdd7d2921ca10b0a5ae3f750b402e12981004 (patch)
tree4c9f577be07200eebc38af96a3b12c2360d72f9a /activesupport/lib/active_support
parent8b340ab2f62bac2af9d5917e296bb4101530282a (diff)
downloadrails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.gz
rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.bz2
rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.zip
Added queue abstraction to Orchestra.
Diffstat (limited to 'activesupport/lib/active_support')
-rw-r--r--activesupport/lib/active_support/orchestra.rb57
1 files changed, 33 insertions, 24 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb
index efe30669d8..66097a7bec 100644
--- a/activesupport/lib/active_support/orchestra.rb
+++ b/activesupport/lib/active_support/orchestra.rb
@@ -29,8 +29,9 @@ module ActiveSupport
# and is available at ActiveSupport::Orchestra::Listener.
#
module Orchestra
+ mattr_accessor :queue
+
@stacked_events = Hash.new { |h,k| h[k] = [] }
- @listeners = []
def self.instrument(name, payload=nil)
stack = @stacked_events[Thread.current.object_id]
@@ -41,15 +42,11 @@ module ActiveSupport
ensure
event.finish!
stack.delete(event)
- @listeners.each { |s| s.push(event) }
- end
-
- def self.register(listener)
- @listeners << listener
+ queue.push(event)
end
- def self.unregister(listener)
- @listeners.delete(listener)
+ def self.subscribe(pattern=nil, &block)
+ queue.subscribe(pattern, &block)
end
class Event
@@ -69,35 +66,47 @@ module ActiveSupport
end
end
- class Listener
- attr_reader :mutex, :signaler, :thread
-
+ # This is a default queue implementation that ships with Orchestra. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class LittleFanout
def initialize
- @mutex, @signaler = Mutex.new, ConditionVariable.new
- @stream = []
+ @listeners, @stream = [], []
+
@thread = Thread.new do
loop do
- (event = @stream.shift) ? consume(event) : wait
+ (event = @stream.shift) ? consume(event) : Thread.stop
end
end
end
- def wait
- @mutex.synchronize do
- @signaler.wait(@mutex)
- end
+ def push(event)
+ @stream.push(event)
+ @thread.run
end
- def push(event)
- @mutex.synchronize do
- @stream.push(event)
- @signaler.broadcast
- end
+ def subscribe(pattern=nil, &block)
+ @listeners << Listener.new(pattern, &block)
end
def consume(event)
- raise NotImplementedError
+ @listeners.each { |l| l.publish(event) }
+ end
+
+ class Listener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ end
+
+ def publish(event)
+ unless @pattern && event.name.to_s !~ @pattern
+ @subscriber.call(event)
+ end
+ end
end
end
end
+
+ Orchestra.queue = Orchestra::LittleFanout.new
end