diff options
author | Jeremy Kemper <jeremy@bitsweat.net> | 2009-11-28 12:49:07 -0800 |
---|---|---|
committer | Jeremy Kemper <jeremy@bitsweat.net> | 2009-11-28 12:50:09 -0800 |
commit | 4f2a04cc085b9117e8af8079a95a063f671d7a3d (patch) | |
tree | fd96c9bc52dc13d3338ed08f4afcd1c685dc6348 | |
parent | 02893d17053123cbf02b65c2fe549421c11a2604 (diff) | |
download | rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.gz rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.bz2 rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.zip |
Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns
-rw-r--r-- | actionpack/test/controller/caching_test.rb | 4 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 148 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 84 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications/instrumenter.rb | 47 | ||||
-rw-r--r-- | activesupport/test/notifications_test.rb | 270 | ||||
-rw-r--r-- | railties/test/application/notifications_test.rb | 30 |
6 files changed, 268 insertions, 315 deletions
diff --git a/actionpack/test/controller/caching_test.rb b/actionpack/test/controller/caching_test.rb index 3ce90b6ccf..54de920740 100644 --- a/actionpack/test/controller/caching_test.rb +++ b/actionpack/test/controller/caching_test.rb @@ -632,13 +632,15 @@ class FragmentCachingTest < ActionController::TestCase def test_fragment_for_logging fragment_computed = false - ActiveSupport::Notifications.queue.expects(:publish).times(2) + events = [] + ActiveSupport::Notifications.subscribe { |*args| events << args } buffer = 'generated till now -> ' @controller.fragment_for(buffer, 'expensive') { fragment_computed = true } assert fragment_computed assert_equal 'generated till now -> ', buffer + assert_equal [:fragment_exist?, :write_fragment], events.map(&:first) end end diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 316d80e064..d9bfcbfcab 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -1,7 +1,4 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' -require 'active_support/secure_random' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an @@ -41,155 +38,42 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' class << self - delegate :instrument, :transaction_id, :transaction, :to => :instrumenter + attr_writer :notifier + delegate :publish, :subscribe, :instrument, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def notifier + @notifier ||= Notifier.new end end - class Instrumenter - def initialize(publisher) - @publisher = publisher - @id = random_id - end - - def transaction - @id, old_id = random_id, @id - yield - ensure - @id = old_id - end - - def transaction_id - @id - end - - def instrument(name, payload={}) - time = Time.now - result = yield if block_given? - ensure - @publisher.publish(name, time, Time.now, result, @id, payload) - end - - private - def random_id - SecureRandom.hex(10) - end - end - - class Publisher - def initialize(queue) + class Notifier + def initialize(queue = Fanout.new) @queue = queue end def publish(*args) @queue.publish(*args) end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - def bind(pattern) - @pattern = pattern - self - end - - def subscribe - @queue.subscribe(@pattern) do |*args| - yield(*args) - end - end - end - - class Event - attr_reader :name, :time, :end, :transaction_id, :result, :payload - - def initialize(name, start, ending, result, transaction_id, payload) - @name = name - @payload = payload.dup - @time = start - @transaction_id = transaction_id - @end = ending - @result = result - end - - def duration - @duration ||= 1000.0 * (@end - @time) - end - - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + duration >= event.duration) - end - end - - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - end - - def publish(*args) - @listeners.each { |l| l.publish(*args) } - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def subscribe(pattern = nil, &block) + @queue.bind(pattern).subscribe(&block) end def wait - sleep 0.05 until drained? + @queue.wait end - private - def drained? - @listeners.all? &:drained? - end - - # Used for internal implementation only. - class Listener #:nodoc: - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end + delegate :instrument, :to => :current_instrumenter - def drained? - @queue.size.zero? + private + def current_instrumenter + Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self) end - end end end - - Notifications.queue = Notifications::LittleFanout.new end diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb new file mode 100644 index 0000000000..412d977b25 --- /dev/null +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -0,0 +1,84 @@ +require 'thread' + +module ActiveSupport + module Notifications + # This is a default queue implementation that ships with Notifications. It + # consumes events in a thread and publish them to all registered subscribers. + # + class Fanout + def initialize + @subscribers = [] + end + + def bind(pattern) + Binding.new(self, pattern) + end + + def subscribe(pattern = nil, &block) + @subscribers << Subscriber.new(pattern, &block) + end + + def publish(*args) + @subscribers.each { |s| s.publish(*args) } + end + + def wait + sleep(0.05) until @subscribers.all?(&:drained?) + end + + # Used for internal implementation only. + class Binding #:nodoc: + def initialize(queue, pattern) + @queue, @pattern = queue, pattern + end + + def subscribe(&block) + @queue.subscribe(@pattern, &block) + end + end + + # Used for internal implementation only. + class Subscriber #:nodoc: + def initialize(pattern, &block) + @pattern = + case pattern + when Regexp, NilClass + pattern + else + /^#{Regexp.escape(pattern.to_s)}/ + end + @block = block + @events = Queue.new + start_consumer + end + + def publish(name, *args) + push(name, args) if matches?(name) + end + + def consume + while args = @events.shift + @block.call(*args) + end + end + + def drained? + @events.size.zero? + end + + private + def start_consumer + Thread.new { consume } + end + + def matches?(name) + !@pattern || @pattern =~ name.to_s + end + + def push(name, args) + @events << args.unshift(name) + end + end + end + end +end diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb new file mode 100644 index 0000000000..fb95422af2 --- /dev/null +++ b/activesupport/lib/active_support/notifications/instrumenter.rb @@ -0,0 +1,47 @@ +require 'active_support/secure_random' +require 'active_support/core_ext/module/delegation' + +module ActiveSupport + module Notifications + class Instrumenter + def initialize(notifier) + @id = unique_id + @notifier = notifier + end + + def instrument(name, payload={}) + time = Time.now + result = yield if block_given? + ensure + @notifier.publish(name, time, Time.now, result, @id, payload) + end + + private + def unique_id + SecureRandom.hex(10) + end + end + + class Event + attr_reader :name, :time, :end, :transaction_id, :result, :payload + + def initialize(name, start, ending, result, transaction_id, payload) + @name = name + @payload = payload.dup + @time = start + @transaction_id = transaction_id + @end = ending + @result = result + end + + def duration + @duration ||= 1000.0 * (@end - @time) + end + + def parent_of?(event) + start = (self.time - event.time) * 1000 + start <= 0 && (start + duration >= event.duration) + end + end + end +end diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb index 3df2088ac9..93c61b2c83 100644 --- a/activesupport/test/notifications_test.rb +++ b/activesupport/test/notifications_test.rb @@ -1,206 +1,160 @@ require 'abstract_unit' -# Allow LittleFanout to be cleaned. -class ActiveSupport::Notifications::LittleFanout - def clear - @listeners.clear - end -end +module Notifications + class TestCase < ActiveSupport::TestCase + def setup + Thread.abort_on_exception = true + + @notifier = ActiveSupport::Notifications::Notifier.new + @events = [] + @notifier.subscribe { |*args| @events << event(*args) } + end -class NotificationsEventTest < Test::Unit::TestCase - def test_events_are_initialized_with_details - event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar) - assert_equal :foo, event.name - assert_equal Hash[:payload => :bar], event.payload - end + def teardown + Thread.abort_on_exception = false + end - def test_events_consumes_information_given_as_payload - time = Time.now - event = event(:foo, time, time + 0.01, 1, random_id, {}) + private + def event(*args) + ActiveSupport::Notifications::Event.new(*args) + end - assert_equal Hash.new, event.payload - assert_equal time, event.time - assert_equal 1, event.result - assert_equal 10.0, event.duration + def drain + @notifier.wait + end end - def test_event_is_parent_based_on_time_frame - time = Time.utc(2009, 01, 01, 0, 0, 1) + class PubSubTest < TestCase + def test_events_are_published_to_a_listener + @notifier.publish :foo + @notifier.wait + assert_equal [[:foo]], @events + end - parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {}) - child = event(:foo, time, time + 10, nil, random_id, {}) - not_child = event(:foo, time, time + 100, nil, random_id, {}) + def test_subscriber_with_pattern + events = [] + @notifier.subscribe('1') { |*args| events << args } - assert parent.parent_of?(child) - assert !child.parent_of?(parent) - assert !parent.parent_of?(not_child) - assert !not_child.parent_of?(parent) - end + @notifier.publish '1' + @notifier.publish '1.a' + @notifier.publish 'a.1' + @notifier.wait -protected + assert_equal [['1'], ['1.a']], events + end - def random_id - @random_id ||= ActiveSupport::SecureRandom.hex(10) - end + def test_subscriber_with_pattern_as_regexp + events = [] + @notifier.subscribe(/\d/) { |*args| events << args } - def event(*args) - ActiveSupport::Notifications::Event.new(*args) - end -end + @notifier.publish '1' + @notifier.publish 'a.1' + @notifier.publish '1.a' + @notifier.wait -class NotificationsMainTest < Test::Unit::TestCase - def setup - @events = [] - Thread.abort_on_exception = true - ActiveSupport::Notifications.subscribe do |*args| - @events << ActiveSupport::Notifications::Event.new(*args) + assert_equal [['1'], ['a.1'], ['1.a']], events end - end - def teardown - Thread.abort_on_exception = false - ActiveSupport::Notifications.queue.clear - end + def test_multiple_subscribers + @another = [] + @notifier.subscribe { |*args| @another << args } + @notifier.publish :foo + @notifier.wait - def test_notifications_returns_action_result - result = ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - 1 + 1 + assert_equal [[:foo]], @events + assert_equal [[:foo]], @another end - assert_equal 2, result + private + def event(*args) + args + end end - def test_events_are_published_to_a_listener - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - 1 + 1 + class InstrumentationTest < TestCase + def test_instrument_returns_block_result + assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 } end - drain + def test_nested_events_can_be_instrumented + @notifier.instrument(:awesome, :payload => "notifications") do + @notifier.instrument(:wot, :payload => "child") do + 1 + 1 + end - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end + drain - def test_nested_events_can_be_instrumented - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - ActiveSupport::Notifications.instrument(:wot, :payload => "child") do - 1 + 1 + assert_equal 1, @events.size + assert_equal :wot, @events.first.name + assert_equal Hash[:payload => "child"], @events.first.payload end drain - assert_equal 1, @events.size - assert_equal :wot, @events.first.name - assert_equal Hash[:payload => "child"], @events.first.payload + assert_equal 2, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload end - drain - - assert_equal 2, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end - - def test_event_is_pushed_even_if_block_fails - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - raise "OMG" - end rescue RuntimeError - - drain - - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end - - def test_event_is_pushed_even_without_block - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") - drain - - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end - - def test_subscribed_in_a_transaction - @another = [] + def test_instrument_publishes_when_exception_is_raised + begin + @notifier.instrument(:awesome, :payload => "notifications") do + raise "OMG" + end + flunk + rescue + end - ActiveSupport::Notifications.subscribe("cache") do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) - end + drain - ActiveSupport::Notifications.instrument(:cache){ 1 } - ActiveSupport::Notifications.transaction do - ActiveSupport::Notifications.instrument(:cache){ 1 } + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload end - ActiveSupport::Notifications.instrument(:cache){ 1 } - - drain - assert_equal 3, @another.size - before, during, after = @another.map {|e| e.transaction_id } - assert_equal before, after - assert_not_equal before, during - end - - def test_subscriber_with_pattern - @another = [] + def test_event_is_pushed_even_without_block + @notifier.instrument(:awesome, :payload => "notifications") + drain - ActiveSupport::Notifications.subscribe("cache") do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload end - - ActiveSupport::Notifications.instrument(:cache){ 1 } - - drain - - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result end - def test_subscriber_with_pattern_as_regexp - @another = [] - ActiveSupport::Notifications.subscribe(/cache/) do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + class EventTest < TestCase + def test_events_are_initialized_with_details + event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar) + assert_equal :foo, event.name + assert_equal Hash[:payload => :bar], event.payload end - ActiveSupport::Notifications.instrument(:something){ 0 } - ActiveSupport::Notifications.instrument(:cache){ 1 } - - drain - - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result - end + def test_events_consumes_information_given_as_payload + time = Time.now + event = event(:foo, time, time + 0.01, 1, random_id, {}) - def test_with_several_consumers_and_several_events - @another = [] - ActiveSupport::Notifications.subscribe do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + assert_equal Hash.new, event.payload + assert_equal time, event.time + assert_equal 1, event.result + assert_equal 10.0, event.duration end - 1.upto(100) do |i| - ActiveSupport::Notifications.instrument(:value){ i } - end + def test_event_is_parent_based_on_time_frame + time = Time.utc(2009, 01, 01, 0, 0, 1) - drain + parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {}) + child = event(:foo, time, time + 10, nil, random_id, {}) + not_child = event(:foo, time, time + 100, nil, random_id, {}) - assert_equal 100, @events.size - assert_equal :value, @events.first.name - assert_equal 1, @events.first.result - assert_equal 100, @events.last.result + assert parent.parent_of?(child) + assert !child.parent_of?(parent) + assert !parent.parent_of?(not_child) + assert !not_child.parent_of?(parent) + end - assert_equal 100, @another.size - assert_equal :value, @another.first.name - assert_equal 1, @another.first.result - assert_equal 100, @another.last.result + protected + def random_id + @random_id ||= ActiveSupport::SecureRandom.hex(10) + end end - - private - def drain - ActiveSupport::Notifications.queue.wait - end end diff --git a/railties/test/application/notifications_test.rb b/railties/test/application/notifications_test.rb index 28dfdfcd83..71e406f2c1 100644 --- a/railties/test/application/notifications_test.rb +++ b/railties/test/application/notifications_test.rb @@ -5,21 +5,8 @@ module ApplicationTests include ActiveSupport::Testing::Isolation class MyQueue - attr_reader :events, :subscribers - - def initialize - @events = [] - @subscribers = [] - @listeners = [] - end - def publish(name, *args) - @events << name - end - - def subscribe(listener, pattern=nil, &block) - @listeners << listener - @subscribers << pattern + raise name end end @@ -28,21 +15,16 @@ module ApplicationTests boot_rails require "rails" require "active_support/notifications" + @events = [] Rails::Initializer.run do |c| - c.notifications.queue = MyQueue.new - c.notifications.subscribe(/listening/) do - puts "Cool" - end + c.notifications.notifier = ActiveSupport::Notifications::Notifier.new(MyQueue.new) end end test "new queue is set" do - ActiveSupport::Notifications.instrument(:foo) - assert_equal :foo, ActiveSupport::Notifications.queue.events.first - end - - test "configuration subscribers are loaded" do - assert_equal 1, ActiveSupport::Notifications.queue.subscribers.count { |s| s == /listening/ } + assert_raise RuntimeError do + ActiveSupport::Notifications.publish('foo') + end end end end |