aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support
diff options
context:
space:
mode:
authorJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:49:07 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:50:09 -0800
commit4f2a04cc085b9117e8af8079a95a063f671d7a3d (patch)
treefd96c9bc52dc13d3338ed08f4afcd1c685dc6348 /activesupport/lib/active_support
parent02893d17053123cbf02b65c2fe549421c11a2604 (diff)
downloadrails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.gz
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.bz2
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.zip
Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns
Diffstat (limited to 'activesupport/lib/active_support')
-rw-r--r--activesupport/lib/active_support/notifications.rb148
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb84
-rw-r--r--activesupport/lib/active_support/notifications/instrumenter.rb47
3 files changed, 147 insertions, 132 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index 316d80e064..d9bfcbfcab 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -1,7 +1,4 @@
-require 'thread'
require 'active_support/core_ext/module/delegation'
-require 'active_support/core_ext/module/attribute_accessors'
-require 'active_support/secure_random'
module ActiveSupport
# Notifications provides an instrumentation API for Ruby. To instrument an
@@ -41,155 +38,42 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue
+ autoload :Instrumenter, 'active_support/notifications/instrumenter'
+ autoload :Event, 'active_support/notifications/instrumenter'
+ autoload :Fanout, 'active_support/notifications/fanout'
class << self
- delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
+ attr_writer :notifier
+ delegate :publish, :subscribe, :instrument, :to => :notifier
- def instrumenter
- Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
- end
-
- def publisher
- @publisher ||= Publisher.new(queue)
- end
-
- def subscribe(pattern=nil, &block)
- Subscriber.new(queue).bind(pattern).subscribe(&block)
+ def notifier
+ @notifier ||= Notifier.new
end
end
- class Instrumenter
- def initialize(publisher)
- @publisher = publisher
- @id = random_id
- end
-
- def transaction
- @id, old_id = random_id, @id
- yield
- ensure
- @id = old_id
- end
-
- def transaction_id
- @id
- end
-
- def instrument(name, payload={})
- time = Time.now
- result = yield if block_given?
- ensure
- @publisher.publish(name, time, Time.now, result, @id, payload)
- end
-
- private
- def random_id
- SecureRandom.hex(10)
- end
- end
-
- class Publisher
- def initialize(queue)
+ class Notifier
+ def initialize(queue = Fanout.new)
@queue = queue
end
def publish(*args)
@queue.publish(*args)
end
- end
-
- class Subscriber
- def initialize(queue)
- @queue = queue
- end
- def bind(pattern)
- @pattern = pattern
- self
- end
-
- def subscribe
- @queue.subscribe(@pattern) do |*args|
- yield(*args)
- end
- end
- end
-
- class Event
- attr_reader :name, :time, :end, :transaction_id, :result, :payload
-
- def initialize(name, start, ending, result, transaction_id, payload)
- @name = name
- @payload = payload.dup
- @time = start
- @transaction_id = transaction_id
- @end = ending
- @result = result
- end
-
- def duration
- @duration ||= 1000.0 * (@end - @time)
- end
-
- def parent_of?(event)
- start = (self.time - event.time) * 1000
- start <= 0 && (start + duration >= event.duration)
- end
- end
-
- # This is a default queue implementation that ships with Notifications. It
- # consumes events in a thread and publish them to all registered subscribers.
- #
- class LittleFanout
- def initialize
- @listeners = []
- end
-
- def publish(*args)
- @listeners.each { |l| l.publish(*args) }
- end
-
- def subscribe(pattern=nil, &block)
- @listeners << Listener.new(pattern, &block)
+ def subscribe(pattern = nil, &block)
+ @queue.bind(pattern).subscribe(&block)
end
def wait
- sleep 0.05 until drained?
+ @queue.wait
end
- private
- def drained?
- @listeners.all? &:drained?
- end
-
- # Used for internal implementation only.
- class Listener #:nodoc:
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
+ delegate :instrument, :to => :current_instrumenter
- def drained?
- @queue.size.zero?
+ private
+ def current_instrumenter
+ Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self)
end
- end
end
end
-
- Notifications.queue = Notifications::LittleFanout.new
end
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
new file mode 100644
index 0000000000..412d977b25
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -0,0 +1,84 @@
+require 'thread'
+
+module ActiveSupport
+ module Notifications
+ # This is a default queue implementation that ships with Notifications. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class Fanout
+ def initialize
+ @subscribers = []
+ end
+
+ def bind(pattern)
+ Binding.new(self, pattern)
+ end
+
+ def subscribe(pattern = nil, &block)
+ @subscribers << Subscriber.new(pattern, &block)
+ end
+
+ def publish(*args)
+ @subscribers.each { |s| s.publish(*args) }
+ end
+
+ def wait
+ sleep(0.05) until @subscribers.all?(&:drained?)
+ end
+
+ # Used for internal implementation only.
+ class Binding #:nodoc:
+ def initialize(queue, pattern)
+ @queue, @pattern = queue, pattern
+ end
+
+ def subscribe(&block)
+ @queue.subscribe(@pattern, &block)
+ end
+ end
+
+ # Used for internal implementation only.
+ class Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
+ @block = block
+ @events = Queue.new
+ start_consumer
+ end
+
+ def publish(name, *args)
+ push(name, args) if matches?(name)
+ end
+
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
+ end
+
+ def drained?
+ @events.size.zero?
+ end
+
+ private
+ def start_consumer
+ Thread.new { consume }
+ end
+
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+
+ def push(name, args)
+ @events << args.unshift(name)
+ end
+ end
+ end
+ end
+end
diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
new file mode 100644
index 0000000000..fb95422af2
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -0,0 +1,47 @@
+require 'active_support/secure_random'
+require 'active_support/core_ext/module/delegation'
+
+module ActiveSupport
+ module Notifications
+ class Instrumenter
+ def initialize(notifier)
+ @id = unique_id
+ @notifier = notifier
+ end
+
+ def instrument(name, payload={})
+ time = Time.now
+ result = yield if block_given?
+ ensure
+ @notifier.publish(name, time, Time.now, result, @id, payload)
+ end
+
+ private
+ def unique_id
+ SecureRandom.hex(10)
+ end
+ end
+
+ class Event
+ attr_reader :name, :time, :end, :transaction_id, :result, :payload
+
+ def initialize(name, start, ending, result, transaction_id, payload)
+ @name = name
+ @payload = payload.dup
+ @time = start
+ @transaction_id = transaction_id
+ @end = ending
+ @result = result
+ end
+
+ def duration
+ @duration ||= 1000.0 * (@end - @time)
+ end
+
+ def parent_of?(event)
+ start = (self.time - event.time) * 1000
+ start <= 0 && (start + duration >= event.duration)
+ end
+ end
+ end
+end