From 6f7fc5824f2033c0f674b002dbee7f1c3f3384ac Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Tue, 24 Nov 2009 19:26:13 -0800 Subject: Revert "Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine." Take a step back on this API direction. This reverts commit 8104f65c3225453d13307c3c2733c2a8f99e491a. --- activesupport/lib/active_support/notifications.rb | 92 +++++++++-------------- 1 file changed, 34 insertions(+), 58 deletions(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index e2540cd598..7a9f76b26a 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -41,7 +41,7 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue, :listener + mattr_accessor :queue class << self delegate :instrument, :transaction_id, :transaction, :to => :instrumenter @@ -54,13 +54,8 @@ module ActiveSupport @publisher ||= Publisher.new(queue) end - def subscriber - @subscriber ||= Subscriber.new(queue) - end - - def subscribe(pattern=nil, options={}, &block) - with = options[:with] || listener - subscriber.bind(with, pattern).subscribe(&block) + def subscribe(pattern=nil, &block) + Subscriber.new(queue).bind(pattern).subscribe(&block) end end @@ -109,14 +104,13 @@ module ActiveSupport @queue = queue end - def bind(listener, pattern) - @listener = listener - @pattern = pattern + def bind(pattern) + @pattern = pattern self end def subscribe - @queue.subscribe(@listener, @pattern) do |*args| + @queue.subscribe(@pattern) do |*args| yield(*args) end end @@ -144,48 +138,6 @@ module ActiveSupport end end - class AsyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end - - class SyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @subscriber.call(*args.unshift(name)) - end - end - - def drained? - true - end - end - # This is a default queue implementation that ships with Notifications. It # consumes events in a thread and publish them to all registered subscribers. # @@ -198,16 +150,40 @@ module ActiveSupport @listeners.each { |l| l.publish(*args) } end - def subscribe(listener, pattern=nil, &block) - @listeners << listener.new(pattern, &block) + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) end def drained? @listeners.all? &:drained? end + + class Listener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new + Thread.new { consume } + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @queue << args.unshift(name) + end + end + + def consume + while args = @queue.shift + @subscriber.call(*args) + end + end + + def drained? + @queue.size.zero? + end + end end end - Notifications.queue = Notifications::LittleFanout.new - Notifications.listener = Notifications::AsyncListener + Notifications.queue = Notifications::LittleFanout.new end -- cgit v1.2.3 From ddf681ce1d3e71aef913dd7f94c60b7622523f8b Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Tue, 24 Nov 2009 19:35:01 -0800 Subject: Expose a simple Queue#wait to block until all notifications are drained --- activesupport/lib/active_support/notifications.rb | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 7a9f76b26a..09b1aa1713 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -154,10 +154,15 @@ module ActiveSupport @listeners << Listener.new(pattern, &block) end - def drained? - @listeners.all? &:drained? + def wait + sleep 0.05 until drained? end + private + def drained? + @listeners.all? &:drained? + end + class Listener def initialize(pattern, &block) @pattern = pattern -- cgit v1.2.3 From 02893d17053123cbf02b65c2fe549421c11a2604 Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Tue, 24 Nov 2009 19:37:10 -0800 Subject: Remark that Listener is an implementation detail --- activesupport/lib/active_support/notifications.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 09b1aa1713..316d80e064 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -163,7 +163,8 @@ module ActiveSupport @listeners.all? &:drained? end - class Listener + # Used for internal implementation only. + class Listener #:nodoc: def initialize(pattern, &block) @pattern = pattern @subscriber = block -- cgit v1.2.3 From 4f2a04cc085b9117e8af8079a95a063f671d7a3d Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Sat, 28 Nov 2009 12:49:07 -0800 Subject: Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns --- activesupport/lib/active_support/notifications.rb | 148 +++------------------- 1 file changed, 16 insertions(+), 132 deletions(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 316d80e064..d9bfcbfcab 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -1,7 +1,4 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' -require 'active_support/secure_random' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an @@ -41,155 +38,42 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' class << self - delegate :instrument, :transaction_id, :transaction, :to => :instrumenter + attr_writer :notifier + delegate :publish, :subscribe, :instrument, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def notifier + @notifier ||= Notifier.new end end - class Instrumenter - def initialize(publisher) - @publisher = publisher - @id = random_id - end - - def transaction - @id, old_id = random_id, @id - yield - ensure - @id = old_id - end - - def transaction_id - @id - end - - def instrument(name, payload={}) - time = Time.now - result = yield if block_given? - ensure - @publisher.publish(name, time, Time.now, result, @id, payload) - end - - private - def random_id - SecureRandom.hex(10) - end - end - - class Publisher - def initialize(queue) + class Notifier + def initialize(queue = Fanout.new) @queue = queue end def publish(*args) @queue.publish(*args) end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - def bind(pattern) - @pattern = pattern - self - end - - def subscribe - @queue.subscribe(@pattern) do |*args| - yield(*args) - end - end - end - - class Event - attr_reader :name, :time, :end, :transaction_id, :result, :payload - - def initialize(name, start, ending, result, transaction_id, payload) - @name = name - @payload = payload.dup - @time = start - @transaction_id = transaction_id - @end = ending - @result = result - end - - def duration - @duration ||= 1000.0 * (@end - @time) - end - - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + duration >= event.duration) - end - end - - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - end - - def publish(*args) - @listeners.each { |l| l.publish(*args) } - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def subscribe(pattern = nil, &block) + @queue.bind(pattern).subscribe(&block) end def wait - sleep 0.05 until drained? + @queue.wait end - private - def drained? - @listeners.all? &:drained? - end - - # Used for internal implementation only. - class Listener #:nodoc: - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end + delegate :instrument, :to => :current_instrumenter - def drained? - @queue.size.zero? + private + def current_instrumenter + Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self) end - end end end - - Notifications.queue = Notifications::LittleFanout.new end -- cgit v1.2.3