diff options
author | Mikel Lindsaar <raasdnil@gmail.com> | 2009-12-17 11:24:02 +1100 |
---|---|---|
committer | Mikel Lindsaar <raasdnil@gmail.com> | 2009-12-17 11:24:02 +1100 |
commit | 186cd7bc530f705b889c27f3680ab48c7c10a6f3 (patch) | |
tree | 76955e442615d77ee05ef2a8260c5373e1cac680 /activesupport/lib/active_support/notifications.rb | |
parent | 5f2395041d1578433fa825ed5c6f26a201f2203d (diff) | |
parent | b9d4ceb43c9497fb1c47d8b1e1e6a24a9e157384 (diff) | |
download | rails-186cd7bc530f705b889c27f3680ab48c7c10a6f3.tar.gz rails-186cd7bc530f705b889c27f3680ab48c7c10a6f3.tar.bz2 rails-186cd7bc530f705b889c27f3680ab48c7c10a6f3.zip |
Merge branch 'rails'
Conflicts:
actionmailer/lib/action_mailer.rb
actionmailer/lib/action_mailer/delivery_method/smtp.rb
Diffstat (limited to 'activesupport/lib/active_support/notifications.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 168 |
1 files changed, 17 insertions, 151 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index e2540cd598..d9bfcbfcab 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -1,7 +1,4 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' -require 'active_support/secure_random' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an @@ -41,173 +38,42 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue, :listener + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' class << self - delegate :instrument, :transaction_id, :transaction, :to => :instrumenter + attr_writer :notifier + delegate :publish, :subscribe, :instrument, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscriber - @subscriber ||= Subscriber.new(queue) - end - - def subscribe(pattern=nil, options={}, &block) - with = options[:with] || listener - subscriber.bind(with, pattern).subscribe(&block) + def notifier + @notifier ||= Notifier.new end end - class Instrumenter - def initialize(publisher) - @publisher = publisher - @id = random_id - end - - def transaction - @id, old_id = random_id, @id - yield - ensure - @id = old_id - end - - def transaction_id - @id - end - - def instrument(name, payload={}) - time = Time.now - result = yield if block_given? - ensure - @publisher.publish(name, time, Time.now, result, @id, payload) - end - - private - def random_id - SecureRandom.hex(10) - end - end - - class Publisher - def initialize(queue) + class Notifier + def initialize(queue = Fanout.new) @queue = queue end def publish(*args) @queue.publish(*args) end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - - def bind(listener, pattern) - @listener = listener - @pattern = pattern - self - end - def subscribe - @queue.subscribe(@listener, @pattern) do |*args| - yield(*args) - end + def subscribe(pattern = nil, &block) + @queue.bind(pattern).subscribe(&block) end - end - - class Event - attr_reader :name, :time, :end, :transaction_id, :result, :payload - def initialize(name, start, ending, result, transaction_id, payload) - @name = name - @payload = payload.dup - @time = start - @transaction_id = transaction_id - @end = ending - @result = result + def wait + @queue.wait end - def duration - @duration ||= 1000.0 * (@end - @time) - end + delegate :instrument, :to => :current_instrumenter - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + duration >= event.duration) - end - end - - class AsyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end - - class SyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @subscriber.call(*args.unshift(name)) + private + def current_instrumenter + Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self) end - end - - def drained? - true - end - end - - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - end - - def publish(*args) - @listeners.each { |l| l.publish(*args) } - end - - def subscribe(listener, pattern=nil, &block) - @listeners << listener.new(pattern, &block) - end - - def drained? - @listeners.all? &:drained? - end end end - - Notifications.queue = Notifications::LittleFanout.new - Notifications.listener = Notifications::AsyncListener end |