aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/orchestra.rb
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@gmail.com>2009-10-06 09:42:42 -0300
committerJosé Valim <jose.valim@gmail.com>2009-10-15 18:18:44 -0300
commit5d0f8abc003cc6edfdb471ada05754580725b353 (patch)
tree05b0ce747fc9decd09eb608dce93608d7a61ce06 /activesupport/lib/active_support/orchestra.rb
parent7b7796e23d12b526fa35976c514da91169dd2566 (diff)
downloadrails-5d0f8abc003cc6edfdb471ada05754580725b353.tar.gz
rails-5d0f8abc003cc6edfdb471ada05754580725b353.tar.bz2
rails-5d0f8abc003cc6edfdb471ada05754580725b353.zip
Orchestra listeners have their own queue.
Diffstat (limited to 'activesupport/lib/active_support/orchestra.rb')
-rw-r--r--activesupport/lib/active_support/orchestra.rb56
1 files changed, 36 insertions, 20 deletions
diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb
index 553be68d99..7c9c3074e3 100644
--- a/activesupport/lib/active_support/orchestra.rb
+++ b/activesupport/lib/active_support/orchestra.rb
@@ -3,31 +3,41 @@ require 'active_support/core_ext/module/delegation'
module ActiveSupport
# Orchestra provides an instrumentation API for Ruby. To instrument an action
- # in Ruby you just need to:
+ # in Ruby you just need to do:
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
- # Those actions are consumed by listeners. A listener is anything that responds
- # to push. You can even register an array:
+ # You can consume those events and the information they provide by registering
+ # a subscriber. For instance, let's store all instrumented events in an array:
#
- # @listener = []
- # ActiveSupport::Orchestra.register @listener
+ # @events = []
+ #
+ # ActiveSupport::Orchestra.subscribe do |event|
+ # @events << event
+ # end
#
# ActiveSupport::Orchestra.instrument(:render, :extra => :information) do
# render :text => "Foo"
# end
#
- # event #=> ActiveSupport::Orchestra::Event
+ # event = @events.first
+ # event.class #=> ActiveSupport::Orchestra::Event
# event.name #=> :render
# event.duration #=> 10 (in miliseconds)
# event.result #=> "Foo"
# event.payload #=> { :extra => :information }
#
- # Orchestra ships with a default listener implementation which puts events in
- # a stream and consume them in a Thread. This implementation is thread safe
- # and is available at ActiveSupport::Orchestra::Listener.
+ # When subscribing to Orchestra, you can pass a pattern, to only consume
+ # events that match the pattern:
+ #
+ # ActiveSupport::Orchestra.subscribe(/render/) do |event|
+ # @render_events << event
+ # end
+ #
+ # Orchestra ships with a queue implementation that consumes and publish events
+ # to subscribers in a thread. You can use any queue implementation you want.
#
module Orchestra
mattr_accessor :queue
@@ -108,37 +118,43 @@ module ActiveSupport
#
class LittleFanout
def initialize
- @listeners, @stream = [], []
-
- @thread = Thread.new do
- loop do
- (event = @stream.shift) ? consume(event) : Thread.stop
- end
- end
+ @listeners, @stream = [], Queue.new
+ @thread = Thread.new { consume }
end
def publish(*event)
@stream.push(event)
- @thread.run
end
def subscribe(pattern=nil, &block)
@listeners << Listener.new(pattern, &block)
end
- def consume(event)
- @listeners.each { |l| l.publish(*event) }
+ def consume
+ while event = @stream.shift
+ @listeners.each { |l| l.publish(*event) }
+ end
end
class Listener
+ attr_reader :thread
+
def initialize(pattern, &block)
@pattern = pattern
@subscriber = block
+ @queue = Queue.new
+ @thread = Thread.new { consume }
end
def publish(name, payload)
unless @pattern && name.to_s !~ @pattern
- @subscriber.call(name, payload)
+ @queue << [name, payload]
+ end
+ end
+
+ def consume
+ while event = @queue.shift
+ @subscriber.call(*event)
end
end
end