aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications.rb')
-rw-r--r--activesupport/lib/active_support/notifications.rb168
1 files changed, 17 insertions, 151 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index e2540cd598..d9bfcbfcab 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -1,7 +1,4 @@
-require 'thread'
require 'active_support/core_ext/module/delegation'
-require 'active_support/core_ext/module/attribute_accessors'
-require 'active_support/secure_random'
module ActiveSupport
# Notifications provides an instrumentation API for Ruby. To instrument an
@@ -41,173 +38,42 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue, :listener
+ autoload :Instrumenter, 'active_support/notifications/instrumenter'
+ autoload :Event, 'active_support/notifications/instrumenter'
+ autoload :Fanout, 'active_support/notifications/fanout'
class << self
- delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
+ attr_writer :notifier
+ delegate :publish, :subscribe, :instrument, :to => :notifier
- def instrumenter
- Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
- end
-
- def publisher
- @publisher ||= Publisher.new(queue)
- end
-
- def subscriber
- @subscriber ||= Subscriber.new(queue)
- end
-
- def subscribe(pattern=nil, options={}, &block)
- with = options[:with] || listener
- subscriber.bind(with, pattern).subscribe(&block)
+ def notifier
+ @notifier ||= Notifier.new
end
end
- class Instrumenter
- def initialize(publisher)
- @publisher = publisher
- @id = random_id
- end
-
- def transaction
- @id, old_id = random_id, @id
- yield
- ensure
- @id = old_id
- end
-
- def transaction_id
- @id
- end
-
- def instrument(name, payload={})
- time = Time.now
- result = yield if block_given?
- ensure
- @publisher.publish(name, time, Time.now, result, @id, payload)
- end
-
- private
- def random_id
- SecureRandom.hex(10)
- end
- end
-
- class Publisher
- def initialize(queue)
+ class Notifier
+ def initialize(queue = Fanout.new)
@queue = queue
end
def publish(*args)
@queue.publish(*args)
end
- end
-
- class Subscriber
- def initialize(queue)
- @queue = queue
- end
-
- def bind(listener, pattern)
- @listener = listener
- @pattern = pattern
- self
- end
- def subscribe
- @queue.subscribe(@listener, @pattern) do |*args|
- yield(*args)
- end
+ def subscribe(pattern = nil, &block)
+ @queue.bind(pattern).subscribe(&block)
end
- end
-
- class Event
- attr_reader :name, :time, :end, :transaction_id, :result, :payload
- def initialize(name, start, ending, result, transaction_id, payload)
- @name = name
- @payload = payload.dup
- @time = start
- @transaction_id = transaction_id
- @end = ending
- @result = result
+ def wait
+ @queue.wait
end
- def duration
- @duration ||= 1000.0 * (@end - @time)
- end
+ delegate :instrument, :to => :current_instrumenter
- def parent_of?(event)
- start = (self.time - event.time) * 1000
- start <= 0 && (start + duration >= event.duration)
- end
- end
-
- class AsyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
-
- def drained?
- @queue.size.zero?
- end
- end
-
- class SyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @subscriber.call(*args.unshift(name))
+ private
+ def current_instrumenter
+ Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self)
end
- end
-
- def drained?
- true
- end
- end
-
- # This is a default queue implementation that ships with Notifications. It
- # consumes events in a thread and publish them to all registered subscribers.
- #
- class LittleFanout
- def initialize
- @listeners = []
- end
-
- def publish(*args)
- @listeners.each { |l| l.publish(*args) }
- end
-
- def subscribe(listener, pattern=nil, &block)
- @listeners << listener.new(pattern, &block)
- end
-
- def drained?
- @listeners.all? &:drained?
- end
end
end
-
- Notifications.queue = Notifications::LittleFanout.new
- Notifications.listener = Notifications::AsyncListener
end