From a60bdd7d2921ca10b0a5ae3f750b402e12981004 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Wed, 30 Sep 2009 08:59:15 -0300 Subject: Added queue abstraction to Orchestra. --- activesupport/lib/active_support/orchestra.rb | 57 +++++++------ activesupport/test/orchestra_test.rb | 111 +++++++++++--------------- 2 files changed, 80 insertions(+), 88 deletions(-) (limited to 'activesupport') diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb index efe30669d8..66097a7bec 100644 --- a/activesupport/lib/active_support/orchestra.rb +++ b/activesupport/lib/active_support/orchestra.rb @@ -29,8 +29,9 @@ module ActiveSupport # and is available at ActiveSupport::Orchestra::Listener. # module Orchestra + mattr_accessor :queue + @stacked_events = Hash.new { |h,k| h[k] = [] } - @listeners = [] def self.instrument(name, payload=nil) stack = @stacked_events[Thread.current.object_id] @@ -41,15 +42,11 @@ module ActiveSupport ensure event.finish! stack.delete(event) - @listeners.each { |s| s.push(event) } - end - - def self.register(listener) - @listeners << listener + queue.push(event) end - def self.unregister(listener) - @listeners.delete(listener) + def self.subscribe(pattern=nil, &block) + queue.subscribe(pattern, &block) end class Event @@ -69,35 +66,47 @@ module ActiveSupport end end - class Listener - attr_reader :mutex, :signaler, :thread - + # This is a default queue implementation that ships with Orchestra. It + # consumes events in a thread and publish them to all registered subscribers. + # + class LittleFanout def initialize - @mutex, @signaler = Mutex.new, ConditionVariable.new - @stream = [] + @listeners, @stream = [], [] + @thread = Thread.new do loop do - (event = @stream.shift) ? consume(event) : wait + (event = @stream.shift) ? consume(event) : Thread.stop end end end - def wait - @mutex.synchronize do - @signaler.wait(@mutex) - end + def push(event) + @stream.push(event) + @thread.run end - def push(event) - @mutex.synchronize do - @stream.push(event) - @signaler.broadcast - end + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) end def consume(event) - raise NotImplementedError + @listeners.each { |l| l.publish(event) } + end + + class Listener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end + + def publish(event) + unless @pattern && event.name.to_s !~ @pattern + @subscriber.call(event) + end + end end end end + + Orchestra.queue = Orchestra::LittleFanout.new end diff --git a/activesupport/test/orchestra_test.rb b/activesupport/test/orchestra_test.rb index 683cc36f6a..e343d6322b 100644 --- a/activesupport/test/orchestra_test.rb +++ b/activesupport/test/orchestra_test.rb @@ -1,5 +1,12 @@ require 'abstract_unit' +# Allow LittleFanout to be cleaned. +class ActiveSupport::Orchestra::LittleFanout + def clear + @listeners.clear + end +end + class OrchestraEventTest < Test::Unit::TestCase def setup @parent = ActiveSupport::Orchestra::Event.new(:parent) @@ -34,12 +41,12 @@ end class OrchestraMainTest < Test::Unit::TestCase def setup - @listener = [] - ActiveSupport::Orchestra.register @listener + @events = [] + ActiveSupport::Orchestra.subscribe { |event| @events << event } end def teardown - ActiveSupport::Orchestra.unregister @listener + ActiveSupport::Orchestra.queue.clear end def test_orchestra_allows_any_action_to_be_instrumented @@ -65,9 +72,9 @@ class OrchestraMainTest < Test::Unit::TestCase 1 + 1 end - assert_equal 1, @listener.size - assert_equal :awesome, @listener.last.name - assert_equal "orchestra", @listener.last.payload + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal "orchestra", @events.last.payload end def test_nested_events_can_be_instrumented @@ -76,18 +83,18 @@ class OrchestraMainTest < Test::Unit::TestCase sleep(0.1) end - assert_equal 1, @listener.size - assert_equal :wot, @listener.first.name - assert_equal "child", @listener.first.payload + assert_equal 1, @events.size + assert_equal :wot, @events.first.name + assert_equal "child", @events.first.payload - assert_nil @listener.first.parent.duration - assert_in_delta 100, @listener.first.duration, 30 + assert_nil @events.first.parent.duration + assert_in_delta 100, @events.first.duration, 30 end - assert_equal 2, @listener.size - assert_equal :awesome, @listener.last.name - assert_equal "orchestra", @listener.last.payload - assert_in_delta 100, @listener.first.parent.duration, 30 + assert_equal 2, @events.size + assert_equal :awesome, @events.last.name + assert_equal "orchestra", @events.last.payload + assert_in_delta 100, @events.first.parent.duration, 30 end def test_event_is_pushed_even_if_block_fails @@ -95,67 +102,43 @@ class OrchestraMainTest < Test::Unit::TestCase raise "OMG" end rescue RuntimeError - assert_equal 1, @listener.size - assert_equal :awesome, @listener.last.name - assert_equal "orchestra", @listener.last.payload + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal "orchestra", @events.last.payload end -end - -class OrchestraListenerTest < Test::Unit::TestCase - class MyListener < ActiveSupport::Orchestra::Listener - attr_reader :consumed - def consume(event) - @consumed ||= [] - @consumed << event - end - end + def test_subscriber_with_pattern + @another = [] + ActiveSupport::Orchestra.subscribe(/cache/) { |event| @another << event } - def setup - @listener = MyListener.new - ActiveSupport::Orchestra.register @listener - end + ActiveSupport::Orchestra.instrument(:something){ 0 } + ActiveSupport::Orchestra.instrument(:cache){ 10 } - def teardown - ActiveSupport::Orchestra.unregister @listener - end - - def test_thread_is_exposed_by_listener - assert_kind_of Thread, @listener.thread - end - - def test_event_is_consumed_when_an_action_is_instrumented - ActiveSupport::Orchestra.instrument(:sum) do - 1 + 1 - end sleep 0.1 - assert_equal 1, @listener.consumed.size - assert_equal :sum, @listener.consumed.first.name - assert_equal 2, @listener.consumed.first.result + + assert_equal 1, @another.size + assert_equal :cache, @another.first.name + assert_equal 10, @another.first.result end - def test_with_sevaral_consumers_and_several_events - @another = MyListener.new - ActiveSupport::Orchestra.register @another + def test_with_several_consumers_and_several_events + @another = [] + ActiveSupport::Orchestra.subscribe { |event| @another << event } 1.upto(100) do |i| - ActiveSupport::Orchestra.instrument(:value) do - i - end + ActiveSupport::Orchestra.instrument(:value){ i } end sleep 0.1 - assert_equal 100, @listener.consumed.size - assert_equal :value, @listener.consumed.first.name - assert_equal 1, @listener.consumed.first.result - assert_equal 100, @listener.consumed.last.result - - assert_equal 100, @another.consumed.size - assert_equal :value, @another.consumed.first.name - assert_equal 1, @another.consumed.first.result - assert_equal 100, @another.consumed.last.result - ensure - ActiveSupport::Orchestra.unregister @another + assert_equal 100, @events.size + assert_equal :value, @events.first.name + assert_equal 1, @events.first.result + assert_equal 100, @events.last.result + + assert_equal 100, @another.size + assert_equal :value, @another.first.name + assert_equal 1, @another.first.result + assert_equal 100, @another.last.result end end -- cgit v1.2.3