aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:49:07 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:50:09 -0800
commit4f2a04cc085b9117e8af8079a95a063f671d7a3d (patch)
treefd96c9bc52dc13d3338ed08f4afcd1c685dc6348
parent02893d17053123cbf02b65c2fe549421c11a2604 (diff)
downloadrails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.gz
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.bz2
rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.zip
Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns
-rw-r--r--actionpack/test/controller/caching_test.rb4
-rw-r--r--activesupport/lib/active_support/notifications.rb148
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb84
-rw-r--r--activesupport/lib/active_support/notifications/instrumenter.rb47
-rw-r--r--activesupport/test/notifications_test.rb270
-rw-r--r--railties/test/application/notifications_test.rb30
6 files changed, 268 insertions, 315 deletions
diff --git a/actionpack/test/controller/caching_test.rb b/actionpack/test/controller/caching_test.rb
index 3ce90b6ccf..54de920740 100644
--- a/actionpack/test/controller/caching_test.rb
+++ b/actionpack/test/controller/caching_test.rb
@@ -632,13 +632,15 @@ class FragmentCachingTest < ActionController::TestCase
def test_fragment_for_logging
fragment_computed = false
- ActiveSupport::Notifications.queue.expects(:publish).times(2)
+ events = []
+ ActiveSupport::Notifications.subscribe { |*args| events << args }
buffer = 'generated till now -> '
@controller.fragment_for(buffer, 'expensive') { fragment_computed = true }
assert fragment_computed
assert_equal 'generated till now -> ', buffer
+ assert_equal [:fragment_exist?, :write_fragment], events.map(&:first)
end
end
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index 316d80e064..d9bfcbfcab 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -1,7 +1,4 @@
-require 'thread'
require 'active_support/core_ext/module/delegation'
-require 'active_support/core_ext/module/attribute_accessors'
-require 'active_support/secure_random'
module ActiveSupport
# Notifications provides an instrumentation API for Ruby. To instrument an
@@ -41,155 +38,42 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue
+ autoload :Instrumenter, 'active_support/notifications/instrumenter'
+ autoload :Event, 'active_support/notifications/instrumenter'
+ autoload :Fanout, 'active_support/notifications/fanout'
class << self
- delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
+ attr_writer :notifier
+ delegate :publish, :subscribe, :instrument, :to => :notifier
- def instrumenter
- Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
- end
-
- def publisher
- @publisher ||= Publisher.new(queue)
- end
-
- def subscribe(pattern=nil, &block)
- Subscriber.new(queue).bind(pattern).subscribe(&block)
+ def notifier
+ @notifier ||= Notifier.new
end
end
- class Instrumenter
- def initialize(publisher)
- @publisher = publisher
- @id = random_id
- end
-
- def transaction
- @id, old_id = random_id, @id
- yield
- ensure
- @id = old_id
- end
-
- def transaction_id
- @id
- end
-
- def instrument(name, payload={})
- time = Time.now
- result = yield if block_given?
- ensure
- @publisher.publish(name, time, Time.now, result, @id, payload)
- end
-
- private
- def random_id
- SecureRandom.hex(10)
- end
- end
-
- class Publisher
- def initialize(queue)
+ class Notifier
+ def initialize(queue = Fanout.new)
@queue = queue
end
def publish(*args)
@queue.publish(*args)
end
- end
-
- class Subscriber
- def initialize(queue)
- @queue = queue
- end
- def bind(pattern)
- @pattern = pattern
- self
- end
-
- def subscribe
- @queue.subscribe(@pattern) do |*args|
- yield(*args)
- end
- end
- end
-
- class Event
- attr_reader :name, :time, :end, :transaction_id, :result, :payload
-
- def initialize(name, start, ending, result, transaction_id, payload)
- @name = name
- @payload = payload.dup
- @time = start
- @transaction_id = transaction_id
- @end = ending
- @result = result
- end
-
- def duration
- @duration ||= 1000.0 * (@end - @time)
- end
-
- def parent_of?(event)
- start = (self.time - event.time) * 1000
- start <= 0 && (start + duration >= event.duration)
- end
- end
-
- # This is a default queue implementation that ships with Notifications. It
- # consumes events in a thread and publish them to all registered subscribers.
- #
- class LittleFanout
- def initialize
- @listeners = []
- end
-
- def publish(*args)
- @listeners.each { |l| l.publish(*args) }
- end
-
- def subscribe(pattern=nil, &block)
- @listeners << Listener.new(pattern, &block)
+ def subscribe(pattern = nil, &block)
+ @queue.bind(pattern).subscribe(&block)
end
def wait
- sleep 0.05 until drained?
+ @queue.wait
end
- private
- def drained?
- @listeners.all? &:drained?
- end
-
- # Used for internal implementation only.
- class Listener #:nodoc:
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
+ delegate :instrument, :to => :current_instrumenter
- def drained?
- @queue.size.zero?
+ private
+ def current_instrumenter
+ Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self)
end
- end
end
end
-
- Notifications.queue = Notifications::LittleFanout.new
end
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
new file mode 100644
index 0000000000..412d977b25
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -0,0 +1,84 @@
+require 'thread'
+
+module ActiveSupport
+ module Notifications
+ # This is a default queue implementation that ships with Notifications. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class Fanout
+ def initialize
+ @subscribers = []
+ end
+
+ def bind(pattern)
+ Binding.new(self, pattern)
+ end
+
+ def subscribe(pattern = nil, &block)
+ @subscribers << Subscriber.new(pattern, &block)
+ end
+
+ def publish(*args)
+ @subscribers.each { |s| s.publish(*args) }
+ end
+
+ def wait
+ sleep(0.05) until @subscribers.all?(&:drained?)
+ end
+
+ # Used for internal implementation only.
+ class Binding #:nodoc:
+ def initialize(queue, pattern)
+ @queue, @pattern = queue, pattern
+ end
+
+ def subscribe(&block)
+ @queue.subscribe(@pattern, &block)
+ end
+ end
+
+ # Used for internal implementation only.
+ class Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
+ @block = block
+ @events = Queue.new
+ start_consumer
+ end
+
+ def publish(name, *args)
+ push(name, args) if matches?(name)
+ end
+
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
+ end
+
+ def drained?
+ @events.size.zero?
+ end
+
+ private
+ def start_consumer
+ Thread.new { consume }
+ end
+
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+
+ def push(name, args)
+ @events << args.unshift(name)
+ end
+ end
+ end
+ end
+end
diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb
new file mode 100644
index 0000000000..fb95422af2
--- /dev/null
+++ b/activesupport/lib/active_support/notifications/instrumenter.rb
@@ -0,0 +1,47 @@
+require 'active_support/secure_random'
+require 'active_support/core_ext/module/delegation'
+
+module ActiveSupport
+ module Notifications
+ class Instrumenter
+ def initialize(notifier)
+ @id = unique_id
+ @notifier = notifier
+ end
+
+ def instrument(name, payload={})
+ time = Time.now
+ result = yield if block_given?
+ ensure
+ @notifier.publish(name, time, Time.now, result, @id, payload)
+ end
+
+ private
+ def unique_id
+ SecureRandom.hex(10)
+ end
+ end
+
+ class Event
+ attr_reader :name, :time, :end, :transaction_id, :result, :payload
+
+ def initialize(name, start, ending, result, transaction_id, payload)
+ @name = name
+ @payload = payload.dup
+ @time = start
+ @transaction_id = transaction_id
+ @end = ending
+ @result = result
+ end
+
+ def duration
+ @duration ||= 1000.0 * (@end - @time)
+ end
+
+ def parent_of?(event)
+ start = (self.time - event.time) * 1000
+ start <= 0 && (start + duration >= event.duration)
+ end
+ end
+ end
+end
diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb
index 3df2088ac9..93c61b2c83 100644
--- a/activesupport/test/notifications_test.rb
+++ b/activesupport/test/notifications_test.rb
@@ -1,206 +1,160 @@
require 'abstract_unit'
-# Allow LittleFanout to be cleaned.
-class ActiveSupport::Notifications::LittleFanout
- def clear
- @listeners.clear
- end
-end
+module Notifications
+ class TestCase < ActiveSupport::TestCase
+ def setup
+ Thread.abort_on_exception = true
+
+ @notifier = ActiveSupport::Notifications::Notifier.new
+ @events = []
+ @notifier.subscribe { |*args| @events << event(*args) }
+ end
-class NotificationsEventTest < Test::Unit::TestCase
- def test_events_are_initialized_with_details
- event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar)
- assert_equal :foo, event.name
- assert_equal Hash[:payload => :bar], event.payload
- end
+ def teardown
+ Thread.abort_on_exception = false
+ end
- def test_events_consumes_information_given_as_payload
- time = Time.now
- event = event(:foo, time, time + 0.01, 1, random_id, {})
+ private
+ def event(*args)
+ ActiveSupport::Notifications::Event.new(*args)
+ end
- assert_equal Hash.new, event.payload
- assert_equal time, event.time
- assert_equal 1, event.result
- assert_equal 10.0, event.duration
+ def drain
+ @notifier.wait
+ end
end
- def test_event_is_parent_based_on_time_frame
- time = Time.utc(2009, 01, 01, 0, 0, 1)
+ class PubSubTest < TestCase
+ def test_events_are_published_to_a_listener
+ @notifier.publish :foo
+ @notifier.wait
+ assert_equal [[:foo]], @events
+ end
- parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {})
- child = event(:foo, time, time + 10, nil, random_id, {})
- not_child = event(:foo, time, time + 100, nil, random_id, {})
+ def test_subscriber_with_pattern
+ events = []
+ @notifier.subscribe('1') { |*args| events << args }
- assert parent.parent_of?(child)
- assert !child.parent_of?(parent)
- assert !parent.parent_of?(not_child)
- assert !not_child.parent_of?(parent)
- end
+ @notifier.publish '1'
+ @notifier.publish '1.a'
+ @notifier.publish 'a.1'
+ @notifier.wait
-protected
+ assert_equal [['1'], ['1.a']], events
+ end
- def random_id
- @random_id ||= ActiveSupport::SecureRandom.hex(10)
- end
+ def test_subscriber_with_pattern_as_regexp
+ events = []
+ @notifier.subscribe(/\d/) { |*args| events << args }
- def event(*args)
- ActiveSupport::Notifications::Event.new(*args)
- end
-end
+ @notifier.publish '1'
+ @notifier.publish 'a.1'
+ @notifier.publish '1.a'
+ @notifier.wait
-class NotificationsMainTest < Test::Unit::TestCase
- def setup
- @events = []
- Thread.abort_on_exception = true
- ActiveSupport::Notifications.subscribe do |*args|
- @events << ActiveSupport::Notifications::Event.new(*args)
+ assert_equal [['1'], ['a.1'], ['1.a']], events
end
- end
- def teardown
- Thread.abort_on_exception = false
- ActiveSupport::Notifications.queue.clear
- end
+ def test_multiple_subscribers
+ @another = []
+ @notifier.subscribe { |*args| @another << args }
+ @notifier.publish :foo
+ @notifier.wait
- def test_notifications_returns_action_result
- result = ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
- 1 + 1
+ assert_equal [[:foo]], @events
+ assert_equal [[:foo]], @another
end
- assert_equal 2, result
+ private
+ def event(*args)
+ args
+ end
end
- def test_events_are_published_to_a_listener
- ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
- 1 + 1
+ class InstrumentationTest < TestCase
+ def test_instrument_returns_block_result
+ assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 }
end
- drain
+ def test_nested_events_can_be_instrumented
+ @notifier.instrument(:awesome, :payload => "notifications") do
+ @notifier.instrument(:wot, :payload => "child") do
+ 1 + 1
+ end
- assert_equal 1, @events.size
- assert_equal :awesome, @events.last.name
- assert_equal Hash[:payload => "notifications"], @events.last.payload
- end
+ drain
- def test_nested_events_can_be_instrumented
- ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
- ActiveSupport::Notifications.instrument(:wot, :payload => "child") do
- 1 + 1
+ assert_equal 1, @events.size
+ assert_equal :wot, @events.first.name
+ assert_equal Hash[:payload => "child"], @events.first.payload
end
drain
- assert_equal 1, @events.size
- assert_equal :wot, @events.first.name
- assert_equal Hash[:payload => "child"], @events.first.payload
+ assert_equal 2, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal Hash[:payload => "notifications"], @events.last.payload
end
- drain
-
- assert_equal 2, @events.size
- assert_equal :awesome, @events.last.name
- assert_equal Hash[:payload => "notifications"], @events.last.payload
- end
-
- def test_event_is_pushed_even_if_block_fails
- ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do
- raise "OMG"
- end rescue RuntimeError
-
- drain
-
- assert_equal 1, @events.size
- assert_equal :awesome, @events.last.name
- assert_equal Hash[:payload => "notifications"], @events.last.payload
- end
-
- def test_event_is_pushed_even_without_block
- ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications")
- drain
-
- assert_equal 1, @events.size
- assert_equal :awesome, @events.last.name
- assert_equal Hash[:payload => "notifications"], @events.last.payload
- end
-
- def test_subscribed_in_a_transaction
- @another = []
+ def test_instrument_publishes_when_exception_is_raised
+ begin
+ @notifier.instrument(:awesome, :payload => "notifications") do
+ raise "OMG"
+ end
+ flunk
+ rescue
+ end
- ActiveSupport::Notifications.subscribe("cache") do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
- end
+ drain
- ActiveSupport::Notifications.instrument(:cache){ 1 }
- ActiveSupport::Notifications.transaction do
- ActiveSupport::Notifications.instrument(:cache){ 1 }
+ assert_equal 1, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal Hash[:payload => "notifications"], @events.last.payload
end
- ActiveSupport::Notifications.instrument(:cache){ 1 }
-
- drain
- assert_equal 3, @another.size
- before, during, after = @another.map {|e| e.transaction_id }
- assert_equal before, after
- assert_not_equal before, during
- end
-
- def test_subscriber_with_pattern
- @another = []
+ def test_event_is_pushed_even_without_block
+ @notifier.instrument(:awesome, :payload => "notifications")
+ drain
- ActiveSupport::Notifications.subscribe("cache") do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
+ assert_equal 1, @events.size
+ assert_equal :awesome, @events.last.name
+ assert_equal Hash[:payload => "notifications"], @events.last.payload
end
-
- ActiveSupport::Notifications.instrument(:cache){ 1 }
-
- drain
-
- assert_equal 1, @another.size
- assert_equal :cache, @another.first.name
- assert_equal 1, @another.first.result
end
- def test_subscriber_with_pattern_as_regexp
- @another = []
- ActiveSupport::Notifications.subscribe(/cache/) do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
+ class EventTest < TestCase
+ def test_events_are_initialized_with_details
+ event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar)
+ assert_equal :foo, event.name
+ assert_equal Hash[:payload => :bar], event.payload
end
- ActiveSupport::Notifications.instrument(:something){ 0 }
- ActiveSupport::Notifications.instrument(:cache){ 1 }
-
- drain
-
- assert_equal 1, @another.size
- assert_equal :cache, @another.first.name
- assert_equal 1, @another.first.result
- end
+ def test_events_consumes_information_given_as_payload
+ time = Time.now
+ event = event(:foo, time, time + 0.01, 1, random_id, {})
- def test_with_several_consumers_and_several_events
- @another = []
- ActiveSupport::Notifications.subscribe do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
+ assert_equal Hash.new, event.payload
+ assert_equal time, event.time
+ assert_equal 1, event.result
+ assert_equal 10.0, event.duration
end
- 1.upto(100) do |i|
- ActiveSupport::Notifications.instrument(:value){ i }
- end
+ def test_event_is_parent_based_on_time_frame
+ time = Time.utc(2009, 01, 01, 0, 0, 1)
- drain
+ parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {})
+ child = event(:foo, time, time + 10, nil, random_id, {})
+ not_child = event(:foo, time, time + 100, nil, random_id, {})
- assert_equal 100, @events.size
- assert_equal :value, @events.first.name
- assert_equal 1, @events.first.result
- assert_equal 100, @events.last.result
+ assert parent.parent_of?(child)
+ assert !child.parent_of?(parent)
+ assert !parent.parent_of?(not_child)
+ assert !not_child.parent_of?(parent)
+ end
- assert_equal 100, @another.size
- assert_equal :value, @another.first.name
- assert_equal 1, @another.first.result
- assert_equal 100, @another.last.result
+ protected
+ def random_id
+ @random_id ||= ActiveSupport::SecureRandom.hex(10)
+ end
end
-
- private
- def drain
- ActiveSupport::Notifications.queue.wait
- end
end
diff --git a/railties/test/application/notifications_test.rb b/railties/test/application/notifications_test.rb
index 28dfdfcd83..71e406f2c1 100644
--- a/railties/test/application/notifications_test.rb
+++ b/railties/test/application/notifications_test.rb
@@ -5,21 +5,8 @@ module ApplicationTests
include ActiveSupport::Testing::Isolation
class MyQueue
- attr_reader :events, :subscribers
-
- def initialize
- @events = []
- @subscribers = []
- @listeners = []
- end
-
def publish(name, *args)
- @events << name
- end
-
- def subscribe(listener, pattern=nil, &block)
- @listeners << listener
- @subscribers << pattern
+ raise name
end
end
@@ -28,21 +15,16 @@ module ApplicationTests
boot_rails
require "rails"
require "active_support/notifications"
+ @events = []
Rails::Initializer.run do |c|
- c.notifications.queue = MyQueue.new
- c.notifications.subscribe(/listening/) do
- puts "Cool"
- end
+ c.notifications.notifier = ActiveSupport::Notifications::Notifier.new(MyQueue.new)
end
end
test "new queue is set" do
- ActiveSupport::Notifications.instrument(:foo)
- assert_equal :foo, ActiveSupport::Notifications.queue.events.first
- end
-
- test "configuration subscribers are loaded" do
- assert_equal 1, ActiveSupport::Notifications.queue.subscribers.count { |s| s == /listening/ }
+ assert_raise RuntimeError do
+ ActiveSupport::Notifications.publish('foo')
+ end
end
end
end