From 4f2a04cc085b9117e8af8079a95a063f671d7a3d Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Sat, 28 Nov 2009 12:49:07 -0800 Subject: Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns --- activesupport/lib/active_support/notifications.rb | 148 +++------------------- 1 file changed, 16 insertions(+), 132 deletions(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 316d80e064..d9bfcbfcab 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -1,7 +1,4 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' -require 'active_support/secure_random' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an @@ -41,155 +38,42 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' class << self - delegate :instrument, :transaction_id, :transaction, :to => :instrumenter + attr_writer :notifier + delegate :publish, :subscribe, :instrument, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def notifier + @notifier ||= Notifier.new end end - class Instrumenter - def initialize(publisher) - @publisher = publisher - @id = random_id - end - - def transaction - @id, old_id = random_id, @id - yield - ensure - @id = old_id - end - - def transaction_id - @id - end - - def instrument(name, payload={}) - time = Time.now - result = yield if block_given? - ensure - @publisher.publish(name, time, Time.now, result, @id, payload) - end - - private - def random_id - SecureRandom.hex(10) - end - end - - class Publisher - def initialize(queue) + class Notifier + def initialize(queue = Fanout.new) @queue = queue end def publish(*args) @queue.publish(*args) end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - def bind(pattern) - @pattern = pattern - self - end - - def subscribe - @queue.subscribe(@pattern) do |*args| - yield(*args) - end - end - end - - class Event - attr_reader :name, :time, :end, :transaction_id, :result, :payload - - def initialize(name, start, ending, result, transaction_id, payload) - @name = name - @payload = payload.dup - @time = start - @transaction_id = transaction_id - @end = ending - @result = result - end - - def duration - @duration ||= 1000.0 * (@end - @time) - end - - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + duration >= event.duration) - end - end - - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - end - - def publish(*args) - @listeners.each { |l| l.publish(*args) } - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def subscribe(pattern = nil, &block) + @queue.bind(pattern).subscribe(&block) end def wait - sleep 0.05 until drained? + @queue.wait end - private - def drained? - @listeners.all? &:drained? - end - - # Used for internal implementation only. - class Listener #:nodoc: - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end + delegate :instrument, :to => :current_instrumenter - def drained? - @queue.size.zero? + private + def current_instrumenter + Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self) end - end end end - - Notifications.queue = Notifications::LittleFanout.new end -- cgit v1.2.3