From 5d0f8abc003cc6edfdb471ada05754580725b353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Tue, 6 Oct 2009 09:42:42 -0300 Subject: Orchestra listeners have their own queue. --- activesupport/lib/active_support/orchestra.rb | 56 +++++++++++++++++---------- activesupport/test/orchestra_test.rb | 14 +++++-- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb index 553be68d99..7c9c3074e3 100644 --- a/activesupport/lib/active_support/orchestra.rb +++ b/activesupport/lib/active_support/orchestra.rb @@ -3,31 +3,41 @@ require 'active_support/core_ext/module/delegation' module ActiveSupport # Orchestra provides an instrumentation API for Ruby. To instrument an action - # in Ruby you just need to: + # in Ruby you just need to do: # # ActiveSupport::Orchestra.instrument(:render, :extra => :information) do # render :text => "Foo" # end # - # Those actions are consumed by listeners. A listener is anything that responds - # to push. You can even register an array: + # You can consume those events and the information they provide by registering + # a subscriber. For instance, let's store all instrumented events in an array: # - # @listener = [] - # ActiveSupport::Orchestra.register @listener + # @events = [] + # + # ActiveSupport::Orchestra.subscribe do |event| + # @events << event + # end # # ActiveSupport::Orchestra.instrument(:render, :extra => :information) do # render :text => "Foo" # end # - # event #=> ActiveSupport::Orchestra::Event + # event = @events.first + # event.class #=> ActiveSupport::Orchestra::Event # event.name #=> :render # event.duration #=> 10 (in miliseconds) # event.result #=> "Foo" # event.payload #=> { :extra => :information } # - # Orchestra ships with a default listener implementation which puts events in - # a stream and consume them in a Thread. This implementation is thread safe - # and is available at ActiveSupport::Orchestra::Listener. + # When subscribing to Orchestra, you can pass a pattern, to only consume + # events that match the pattern: + # + # ActiveSupport::Orchestra.subscribe(/render/) do |event| + # @render_events << event + # end + # + # Orchestra ships with a queue implementation that consumes and publish events + # to subscribers in a thread. You can use any queue implementation you want. # module Orchestra mattr_accessor :queue @@ -108,37 +118,43 @@ module ActiveSupport # class LittleFanout def initialize - @listeners, @stream = [], [] - - @thread = Thread.new do - loop do - (event = @stream.shift) ? consume(event) : Thread.stop - end - end + @listeners, @stream = [], Queue.new + @thread = Thread.new { consume } end def publish(*event) @stream.push(event) - @thread.run end def subscribe(pattern=nil, &block) @listeners << Listener.new(pattern, &block) end - def consume(event) - @listeners.each { |l| l.publish(*event) } + def consume + while event = @stream.shift + @listeners.each { |l| l.publish(*event) } + end end class Listener + attr_reader :thread + def initialize(pattern, &block) @pattern = pattern @subscriber = block + @queue = Queue.new + @thread = Thread.new { consume } end def publish(name, payload) unless @pattern && name.to_s !~ @pattern - @subscriber.call(name, payload) + @queue << [name, payload] + end + end + + def consume + while event = @queue.shift + @subscriber.call(*event) end end end diff --git a/activesupport/test/orchestra_test.rb b/activesupport/test/orchestra_test.rb index 608531416c..810d99ebeb 100644 --- a/activesupport/test/orchestra_test.rb +++ b/activesupport/test/orchestra_test.rb @@ -50,6 +50,8 @@ class OrchestraMainTest < Test::Unit::TestCase 1 + 1 end + sleep(0.1) + assert_equal 1, @events.size assert_equal :awesome, @events.last.name assert_equal Hash[:payload => "orchestra"], @events.last.payload @@ -58,18 +60,22 @@ class OrchestraMainTest < Test::Unit::TestCase def test_nested_events_can_be_instrumented ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do ActiveSupport::Orchestra.instrument(:wot, :payload => "child") do - sleep(0.1) + 1 + 1 end + sleep(0.1) + assert_equal 1, @events.size assert_equal :wot, @events.first.name assert_equal Hash[:payload => "child"], @events.first.payload - assert_in_delta 100, @events.first.duration, 30 end + sleep(0.1) + assert_equal 2, @events.size assert_equal :awesome, @events.last.name assert_equal Hash[:payload => "orchestra"], @events.last.payload + assert_in_delta 100, @events.last.duration, 70 end def test_event_is_pushed_even_if_block_fails @@ -77,6 +83,8 @@ class OrchestraMainTest < Test::Unit::TestCase raise "OMG" end rescue RuntimeError + sleep(0.1) + assert_equal 1, @events.size assert_equal :awesome, @events.last.name assert_equal Hash[:payload => "orchestra"], @events.last.payload @@ -89,7 +97,7 @@ class OrchestraMainTest < Test::Unit::TestCase ActiveSupport::Orchestra.instrument(:something){ 0 } ActiveSupport::Orchestra.instrument(:cache){ 10 } - sleep 0.1 + sleep(0.1) assert_equal 1, @another.size assert_equal :cache, @another.first.name -- cgit v1.2.3