path: root/activesupport/lib
diff options
authorJosé Valim <jose.valim@gmail.com>2009-12-01 13:00:34 -0200
committerJosé Valim <jose.valim@gmail.com>2009-12-01 13:00:34 -0200
commitc2e97cb410d759f383d29920165abdbf4b70e019 (patch)
treeb375de84add24ee4c4551deec533b0667512bf34 /activesupport/lib
parentfc3629f6ca2b43693f5447a1fb43881f1814e117 (diff)
parent6ac32a83283f46b55675ddf4ecab6c91f6f8abde (diff)
Merge branch 'master' of git://github.com/rails/rails
Diffstat (limited to 'activesupport/lib')
3 files changed, 165 insertions, 151 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index e2540cd598..d9bfcbfcab 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -1,7 +1,4 @@
-require 'thread'
require 'active_support/core_ext/module/delegation'
-require 'active_support/core_ext/module/attribute_accessors'
-require 'active_support/secure_random'
module ActiveSupport
# Notifications provides an instrumentation API for Ruby. To instrument an
@@ -41,173 +38,42 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
module Notifications
- mattr_accessor :queue, :listener
+ autoload :Instrumenter, 'active_support/notifications/instrumenter'
+ autoload :Event, 'active_support/notifications/instrumenter'
+ autoload :Fanout, 'active_support/notifications/fanout'
class << self
- delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
+ attr_writer :notifier
+ delegate :publish, :subscribe, :instrument, :to => :notifier
- def instrumenter
- Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
- end
- def publisher
- @publisher ||= Publisher.new(queue)
- end
- def subscriber
- @subscriber ||= Subscriber.new(queue)
- end
- def subscribe(pattern=nil, options={}, &block)
- with = options[:with] || listener
- subscriber.bind(with, pattern).subscribe(&block)
+ def notifier
+ @notifier ||= Notifier.new
- class Instrumenter
- def initialize(publisher)
- @publisher = publisher
- @id = random_id
- end
- def transaction
- @id, old_id = random_id, @id
- yield
- ensure
- @id = old_id
- end
- def transaction_id
- @id
- end
- def instrument(name, payload={})
- time = Time.now
- result = yield if block_given?
- ensure
- @publisher.publish(name, time, Time.now, result, @id, payload)
- end
- private
- def random_id
- SecureRandom.hex(10)
- end
- end
- class Publisher
- def initialize(queue)
+ class Notifier
+ def initialize(queue = Fanout.new)
@queue = queue
def publish(*args)
- end
- class Subscriber
- def initialize(queue)
- @queue = queue
- end
- def bind(listener, pattern)
- @listener = listener
- @pattern = pattern
- self
- end
- def subscribe
- @queue.subscribe(@listener, @pattern) do |*args|
- yield(*args)
- end
+ def subscribe(pattern = nil, &block)
+ @queue.bind(pattern).subscribe(&block)
- end
- class Event
- attr_reader :name, :time, :end, :transaction_id, :result, :payload
- def initialize(name, start, ending, result, transaction_id, payload)
- @name = name
- @payload = payload.dup
- @time = start
- @transaction_id = transaction_id
- @end = ending
- @result = result
+ def wait
+ @queue.wait
- def duration
- @duration ||= 1000.0 * (@end - @time)
- end
+ delegate :instrument, :to => :current_instrumenter
- def parent_of?(event)
- start = (self.time - event.time) * 1000
- start <= 0 && (start + duration >= event.duration)
- end
- end
- class AsyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
- def drained?
- @queue.size.zero?
- end
- end
- class SyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- end
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @subscriber.call(*args.unshift(name))
+ private
+ def current_instrumenter
+ Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self)
- end
- def drained?
- true
- end
- end
- # This is a default queue implementation that ships with Notifications. It
- # consumes events in a thread and publish them to all registered subscribers.
- #
- class LittleFanout
- def initialize
- @listeners = []
- end
- def publish(*args)
- @listeners.each { |l| l.publish(*args) }
- end
- def subscribe(listener, pattern=nil, &block)
- @listeners << listener.new(pattern, &block)
- end
- def drained?
- @listeners.all? &:drained?
- end
- Notifications.queue = Notifications::LittleFanout.new
- Notifications.listener = Notifications::AsyncListener
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
new file mode 100644
index 0000000000..bb07e4765c
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -0,0 +1,101 @@
+require 'thread'
+module ActiveSupport
+ module Notifications
+ # This is a default queue implementation that ships with Notifications. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class Fanout
+ def initialize(sync = false)
+ @subscriber_klass = sync ? Subscriber : AsyncSubscriber
+ @subscribers = []
+ end
+ def bind(pattern)
+ Binding.new(self, pattern)
+ end
+ def subscribe(pattern = nil, &block)
+ @subscribers << @subscriber_klass.new(pattern, &block)
+ end
+ def publish(*args)
+ @subscribers.each { |s| s.publish(*args) }
+ end
+ def wait
+ sleep(0.05) until @subscribers.all?(&:drained?)
+ end
+ # Used for internal implementation only.
+ class Binding #:nodoc:
+ def initialize(queue, pattern)
+ @queue = queue
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
+ end
+ def subscribe(&block)
+ @queue.subscribe(@pattern, &block)
+ end
+ end
+ class Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @block = block
+ end
+ def publish(*args)
+ push(*args) if matches?(args.first)
+ end
+ def drained?
+ true
+ end
+ private
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+ def push(*args)
+ @block.call(*args)
+ end
+ end
+ # Used for internal implementation only.
+ class AsyncSubscriber < Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ super
+ @events = Queue.new
+ start_consumer
+ end
+ def drained?
+ @events.empty?
+ end
+ private
+ def start_consumer
+ Thread.new { consume }
+ end
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
+ end
+ def push(*args)
+ @events << args
+ end
+ end
+ end
+ end
diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
new file mode 100644
index 0000000000..fb95422af2
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -0,0 +1,47 @@
+require 'active_support/secure_random'
+require 'active_support/core_ext/module/delegation'
+module ActiveSupport
+ module Notifications
+ class Instrumenter
+ def initialize(notifier)
+ @id = unique_id
+ @notifier = notifier
+ end
+ def instrument(name, payload={})
+ time = Time.now
+ result = yield if block_given?
+ ensure
+ @notifier.publish(name, time, Time.now, result, @id, payload)
+ end
+ private
+ def unique_id
+ SecureRandom.hex(10)
+ end
+ end
+ class Event
+ attr_reader :name, :time, :end, :transaction_id, :result, :payload
+ def initialize(name, start, ending, result, transaction_id, payload)
+ @name = name
+ @payload = payload.dup
+ @time = start
+ @transaction_id = transaction_id
+ @end = ending
+ @result = result
+ end
+ def duration
+ @duration ||= 1000.0 * (@end - @time)
+ end
+ def parent_of?(event)
+ start = (self.time - event.time) * 1000
+ start <= 0 && (start + duration >= event.duration)
+ end
+ end
+ end