aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@gmail.com>2009-10-06 09:42:42 -0300
committerJosé Valim <jose.valim@gmail.com>2009-10-15 18:18:44 -0300
commit5d0f8abc003cc6edfdb471ada05754580725b353 (patch)
tree05b0ce747fc9decd09eb608dce93608d7a61ce06
parent7b7796e23d12b526fa35976c514da91169dd2566 (diff)
downloadrails-5d0f8abc003cc6edfdb471ada05754580725b353.tar.gz
rails-5d0f8abc003cc6edfdb471ada05754580725b353.tar.bz2
rails-5d0f8abc003cc6edfdb471ada05754580725b353.zip
Orchestra listeners have their own queue.
-rw-r--r--activesupport/lib/active_support/orchestra.rb56
-rw-r--r--activesupport/test/orchestra_test.rb14
2 files changed, 47 insertions, 23 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb
index 553be68d99..7c9c3074e3 100644
--- a/activesupport/lib/active_support/orchestra.rb
+++ b/activesupport/lib/active_support/orchestra.rb
@@ -3,31 +3,41 @@ require 'active_support/core_ext/module/delegation'
module ActiveSupport
# Orchestra provides an instrumentation API for Ruby. To instrument an action
- # in Ruby you just need to:
+ # in Ruby you just need to do:
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
- # Those actions are consumed by listeners. A listener is anything that responds
- # to push. You can even register an array:
+ # You can consume those events and the information they provide by registering
+ # a subscriber. For instance, let's store all instrumented events in an array:
#
- # @listener = []
- # ActiveSupport::Orchestra.register @listener
+ # @events = []
+ #
+ # ActiveSupport::Orchestra.subscribe do |event|
+ # @events << event
+ # end
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
- # event #=> ActiveSupport::Orchestra::Event
+ # event = @events.first
+ # event.class #=> ActiveSupport::Orchestra::Event
# event.name #=> :render
# event.duration #=> 10 (in miliseconds)
# event.result #=> "Foo"
# event.payload #=> { :extra => :information }
#
- # Orchestra ships with a default listener implementation which puts events in
- # a stream and consume them in a Thread. This implementation is thread safe
- # and is available at ActiveSupport::Orchestra::Listener.
+ # When subscribing to Orchestra, you can pass a pattern, to only consume
+ # events that match the pattern:
+ #
+ # ActiveSupport::Orchestra.subscribe(/render/) do |event|
+ # @render_events << event
+ # end
+ #
+ # Orchestra ships with a queue implementation that consumes and publish events
+ # to subscribers in a thread. You can use any queue implementation you want.
#
module Orchestra
mattr_accessor :queue
@@ -108,37 +118,43 @@ module ActiveSupport
#
class LittleFanout
def initialize
- @listeners, @stream = [], []
-
- @thread = Thread.new do
- loop do
- (event = @stream.shift) ? consume(event) : Thread.stop
- end
- end
+ @listeners, @stream = [], Queue.new
+ @thread = Thread.new { consume }
end
def publish(*event)
@stream.push(event)
- @thread.run
end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
end
- def consume(event)
- @listeners.each { |l| l.publish(*event) }
+ def consume
+ while event = @stream.shift
+ @listeners.each { |l| l.publish(*event) }
+ end
end
class Listener
+ attr_reader :thread
+
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
+ @queue = Queue.new
+ @thread = Thread.new { consume }
end
def publish(name, payload)
unless @pattern && name.to_s !~ @pattern
- @subscriber.call(name, payload)
+ @queue << [name, payload]
+ end
+ end
+
+ def consume
+ while event = @queue.shift
+ @subscriber.call(*event)
end
end
end
diff --git a/activesupport/test/orchestra_test.rb b/activesupport/test/orchestra_test.rb
index 608531416c..810d99ebeb 100644
--- a/activesupport/test/orchestra_test.rb
+++ b/activesupport/test/orchestra_test.rb
@@ -50,6 +50,8 @@ class OrchestraMainTest < Test::Unit::TestCase
1 + 1
end
+ sleep(0.1)
+
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
@@ -58,18 +60,22 @@ class OrchestraMainTest < Test::Unit::TestCase
def test_nested_events_can_be_instrumented
ActiveSupport::Orchestra.instrument(:awesome, :payload => "orchestra") do
ActiveSupport::Orchestra.instrument(:wot, :payload => "child") do
- sleep(0.1)
+ 1 + 1
end
+ sleep(0.1)
+
assert_equal 1, @events.size
assert_equal :wot, @events.first.name
assert_equal Hash[:payload => "child"], @events.first.payload
- assert_in_delta 100, @events.first.duration, 30
end
+ sleep(0.1)
+
assert_equal 2, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
+ assert_in_delta 100, @events.last.duration, 70
end
def test_event_is_pushed_even_if_block_fails
@@ -77,6 +83,8 @@ class OrchestraMainTest < Test::Unit::TestCase
raise "OMG"
end rescue RuntimeError
+ sleep(0.1)
+
assert_equal 1, @events.size
assert_equal :awesome, @events.last.name
assert_equal Hash[:payload => "orchestra"], @events.last.payload
@@ -89,7 +97,7 @@ class OrchestraMainTest < Test::Unit::TestCase
ActiveSupport::Orchestra.instrument(:something){ 0 }
ActiveSupport::Orchestra.instrument(:cache){ 10 }
- sleep 0.1
+ sleep(0.1)
assert_equal 1, @another.size
assert_equal :cache, @another.first.name