aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications
diff options
context:
space:
mode:
authorJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:49:07 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:50:09 -0800
commit4f2a04cc085b9117e8af8079a95a063f671d7a3d (patch)
treefd96c9bc52dc13d3338ed08f4afcd1c685dc6348 /activesupport/lib/active_support/notifications
parent02893d17053123cbf02b65c2fe549421c11a2604 (diff)
downloadrails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.gz
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.bz2
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.zip
Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns
Diffstat (limited to 'activesupport/lib/active_support/notifications')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb84
-rw-r--r--activesupport/lib/active_support/notifications/instrumenter.rb47
2 files changed, 131 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
new file mode 100644
index 0000000000..412d977b25
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -0,0 +1,84 @@
+require 'thread'
+
+module ActiveSupport
+ module Notifications
+ # This is a default queue implementation that ships with Notifications. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class Fanout
+ def initialize
+ @subscribers = []
+ end
+
+ def bind(pattern)
+ Binding.new(self, pattern)
+ end
+
+ def subscribe(pattern = nil, &block)
+ @subscribers << Subscriber.new(pattern, &block)
+ end
+
+ def publish(*args)
+ @subscribers.each { |s| s.publish(*args) }
+ end
+
+ def wait
+ sleep(0.05) until @subscribers.all?(&:drained?)
+ end
+
+ # Used for internal implementation only.
+ class Binding #:nodoc:
+ def initialize(queue, pattern)
+ @queue, @pattern = queue, pattern
+ end
+
+ def subscribe(&block)
+ @queue.subscribe(@pattern, &block)
+ end
+ end
+
+ # Used for internal implementation only.
+ class Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
+ @block = block
+ @events = Queue.new
+ start_consumer
+ end
+
+ def publish(name, *args)
+ push(name, args) if matches?(name)
+ end
+
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
+ end
+
+ def drained?
+ @events.size.zero?
+ end
+
+ private
+ def start_consumer
+ Thread.new { consume }
+ end
+
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+
+ def push(name, args)
+ @events << args.unshift(name)
+ end
+ end
+ end
+ end
+end
diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
new file mode 100644
index 0000000000..fb95422af2
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -0,0 +1,47 @@
+require 'active_support/secure_random'
+require 'active_support/core_ext/module/delegation'
+
+module ActiveSupport
+ module Notifications
+ class Instrumenter
+ def initialize(notifier)
+ @id = unique_id
+ @notifier = notifier
+ end
+
+ def instrument(name, payload={})
+ time = Time.now
+ result = yield if block_given?
+ ensure
+ @notifier.publish(name, time, Time.now, result, @id, payload)
+ end
+
+ private
+ def unique_id
+ SecureRandom.hex(10)
+ end
+ end
+
+ class Event
+ attr_reader :name, :time, :end, :transaction_id, :result, :payload
+
+ def initialize(name, start, ending, result, transaction_id, payload)
+ @name = name
+ @payload = payload.dup
+ @time = start
+ @transaction_id = transaction_id
+ @end = ending
+ @result = result
+ end
+
+ def duration
+ @duration ||= 1000.0 * (@end - @time)
+ end
+
+ def parent_of?(event)
+ start = (self.time - event.time) * 1000
+ start <= 0 && (start + duration >= event.duration)
+ end
+ end
+ end
+end