aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@gmail.com>2009-09-30 08:59:15 -0300
committerJosé Valim <jose.valim@gmail.com>2009-10-15 18:18:43 -0300
commita60bdd7d2921ca10b0a5ae3f750b402e12981004 (patch)
tree4c9f577be07200eebc38af96a3b12c2360d72f9a
parent8b340ab2f62bac2af9d5917e296bb4101530282a (diff)
downloadrails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.gz
rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.tar.bz2
rails-a60bdd7d2921ca10b0a5ae3f750b402e12981004.zip
Added queue abstraction to Orchestra.
-rw-r--r--activesupport/lib/active_support/orchestra.rb57
-rw-r--r--activesupport/test/orchestra_test.rb111
2 files changed, 80 insertions, 88 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb
index efe30669d8..66097a7bec 100644
--- a/activesupport/lib/active_support/orchestra.rb
+++ b/activesupport/lib/active_support/orchestra.rb
@@ -29,8 +29,9 @@ module ActiveSupport
# and is available at ActiveSupport::Orchestra::Listener.
#
module Orchestra
+ mattr_accessor :queue
+
@stacked_events = Hash.new { |h,k| h[k] = [] }
- @listeners = []
def self.instrument(name, payload=nil)
stack = @stacked_events[Thread.current.object_id]
@@ -41,15 +42,11 @@ module ActiveSupport
ensure
event.finish!
stack.delete(event)
- @listeners.each { |s| s.push(event) }
- end
-
- def self.register(listener)
- @listeners << listener
+ queue.push(event)
end
- def self.unregister(listener)
- @listeners.delete(listener)
+ def self.subscribe(pattern=nil, &block)
+ queue.subscribe(pattern, &block)
end
class Event
@@ -69,35 +66,47 @@ module ActiveSupport
end
end
- class Listener
- attr_reader :mutex, :signaler, :thread
-
+ # This is a default queue implementation that ships with Orchestra. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class LittleFanout
def initialize
- @mutex, @signaler = Mutex.new, ConditionVariable.new
- @stream = []
+ @listeners, @stream = [], []
+
@thread = Thread.new do
loop do
- (event = @stream.shift) ? consume(event) : wait
+ (event = @stream.shift) ? consume(event) : Thread.stop
end
end
end
- def wait
- @mutex.synchronize do
- @signaler.wait(@mutex)
- end
+ def push(event)
+ @stream.push(event)
+ @thread.run
end
- def push(event)
- @mutex.synchronize do
- @stream.push(event)
- @signaler.broadcast
- end
+ def subscribe(pattern=nil, &block)
+ @listeners << Listener.new(pattern, &block)
end
def consume(event)
- raise NotImplementedError
+ @listeners.each { |l| l.publish(event) }
+ end
+
+ class Listener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ end
+
+ def publish(event)
+ unless @pattern && event.name.to_s !~ @pattern
+ @subscriber.call(event)
+ end
+ end
end
end
end
+
+ Orchestra.queue = Orchestra::LittleFanout.new
end
diff --git a/activesupport/test/orchestra_test.rb b/activesupport/test/orchestra_test.rb
index 683cc36f6a..e343d6322b 100644
--- a/activesupport/test/orchestra_test.rb
+++ b/activesupport/test/orchestra_test.rb
@@ -1,5 +1,12 @@
require 'abstract_unit'
+# Allow LittleFanout to be cleaned.
+class ActiveSupport::Orchestra::LittleFanout
+ def clear
+ @listeners.clear
+ end
+end
+
class OrchestraEventTest < Test::Unit::TestCase
def setup
@parent = ActiveSupport::Orchestra::Event.new(:parent)
@@ -34,12 +41,12 @@ end
class OrchestraMainTest < Test::Unit::TestCase
def setup
- @listener = []
- ActiveSupport::Orchestra.register @listener
+ @events = []
+ ActiveSupport::Orchestra.subscribe { |event| @events << event }
end
def teardown
- ActiveSupport::Orchestra.unregister @listener
+ ActiveSupport::Orchestra.queue.clear
end
def test_orchestra_allows_any_action_to_be_instrumented
@@ -65,9 +72,9 @@ class OrchestraMainTest < Test::Unit::TestCase
1 + 1
end
- assert_equal 1, @listener.size
- assert_equal :awesome, @listener.last.name
- assert_equal "orchestra", @listener.last.payload
+ assert_equal 1, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal "orchestra", @events.last.payload
end
def test_nested_events_can_be_instrumented
@@ -76,18 +83,18 @@ class OrchestraMainTest < Test::Unit::TestCase
sleep(0.1)
end
- assert_equal 1, @listener.size
- assert_equal :wot, @listener.first.name
- assert_equal "child", @listener.first.payload
+ assert_equal 1, @events.size
+ assert_equal :wot, @events.first.name
+ assert_equal "child", @events.first.payload
- assert_nil @listener.first.parent.duration
- assert_in_delta 100, @listener.first.duration, 30
+ assert_nil @events.first.parent.duration
+ assert_in_delta 100, @events.first.duration, 30
end
- assert_equal 2, @listener.size
- assert_equal :awesome, @listener.last.name
- assert_equal "orchestra", @listener.last.payload
- assert_in_delta 100, @listener.first.parent.duration, 30
+ assert_equal 2, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal "orchestra", @events.last.payload
+ assert_in_delta 100, @events.first.parent.duration, 30
end
def test_event_is_pushed_even_if_block_fails
@@ -95,67 +102,43 @@ class OrchestraMainTest < Test::Unit::TestCase
raise "OMG"
end rescue RuntimeError
- assert_equal 1, @listener.size
- assert_equal :awesome, @listener.last.name
- assert_equal "orchestra", @listener.last.payload
+ assert_equal 1, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal "orchestra", @events.last.payload
end
-end
-
-class OrchestraListenerTest < Test::Unit::TestCase
- class MyListener < ActiveSupport::Orchestra::Listener
- attr_reader :consumed
- def consume(event)
- @consumed ||= []
- @consumed << event
- end
- end
+ def test_subscriber_with_pattern
+ @another = []
+ ActiveSupport::Orchestra.subscribe(/cache/) { |event| @another << event }
- def setup
- @listener = MyListener.new
- ActiveSupport::Orchestra.register @listener
- end
+ ActiveSupport::Orchestra.instrument(:something){ 0 }
+ ActiveSupport::Orchestra.instrument(:cache){ 10 }
- def teardown
- ActiveSupport::Orchestra.unregister @listener
- end
-
- def test_thread_is_exposed_by_listener
- assert_kind_of Thread, @listener.thread
- end
-
- def test_event_is_consumed_when_an_action_is_instrumented
- ActiveSupport::Orchestra.instrument(:sum) do
- 1 + 1
- end
sleep 0.1
- assert_equal 1, @listener.consumed.size
- assert_equal :sum, @listener.consumed.first.name
- assert_equal 2, @listener.consumed.first.result
+
+ assert_equal 1, @another.size
+ assert_equal :cache, @another.first.name
+ assert_equal 10, @another.first.result
end
- def test_with_sevaral_consumers_and_several_events
- @another = MyListener.new
- ActiveSupport::Orchestra.register @another
+ def test_with_several_consumers_and_several_events
+ @another = []
+ ActiveSupport::Orchestra.subscribe { |event| @another << event }
1.upto(100) do |i|
- ActiveSupport::Orchestra.instrument(:value) do
- i
- end
+ ActiveSupport::Orchestra.instrument(:value){ i }
end
sleep 0.1
- assert_equal 100, @listener.consumed.size
- assert_equal :value, @listener.consumed.first.name
- assert_equal 1, @listener.consumed.first.result
- assert_equal 100, @listener.consumed.last.result
-
- assert_equal 100, @another.consumed.size
- assert_equal :value, @another.consumed.first.name
- assert_equal 1, @another.consumed.first.result
- assert_equal 100, @another.consumed.last.result
- ensure
- ActiveSupport::Orchestra.unregister @another
+ assert_equal 100, @events.size
+ assert_equal :value, @events.first.name
+ assert_equal 1, @events.first.result
+ assert_equal 100, @events.last.result
+
+ assert_equal 100, @another.size
+ assert_equal :value, @another.first.name
+ assert_equal 1, @another.first.result
+ assert_equal 100, @another.last.result
end
end