diff options
author | José Valim <jose.valim@gmail.com> | 2009-12-01 13:00:34 -0200 |
---|---|---|
committer | José Valim <jose.valim@gmail.com> | 2009-12-01 13:00:34 -0200 |
commit | c2e97cb410d759f383d29920165abdbf4b70e019 (patch) | |
tree | b375de84add24ee4c4551deec533b0667512bf34 /activesupport | |
parent | fc3629f6ca2b43693f5447a1fb43881f1814e117 (diff) | |
parent | 6ac32a83283f46b55675ddf4ecab6c91f6f8abde (diff) | |
download | rails-c2e97cb410d759f383d29920165abdbf4b70e019.tar.gz rails-c2e97cb410d759f383d29920165abdbf4b70e019.tar.bz2 rails-c2e97cb410d759f383d29920165abdbf4b70e019.zip |
Merge branch 'master' of git://github.com/rails/rails
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 168 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 101 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications/instrumenter.rb | 47 | ||||
-rw-r--r-- | activesupport/test/notifications_test.rb | 293 |
4 files changed, 286 insertions, 323 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index e2540cd598..d9bfcbfcab 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -1,7 +1,4 @@ -require 'thread' require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' -require 'active_support/secure_random' module ActiveSupport # Notifications provides an instrumentation API for Ruby. To instrument an @@ -41,173 +38,42 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue, :listener + autoload :Instrumenter, 'active_support/notifications/instrumenter' + autoload :Event, 'active_support/notifications/instrumenter' + autoload :Fanout, 'active_support/notifications/fanout' class << self - delegate :instrument, :transaction_id, :transaction, :to => :instrumenter + attr_writer :notifier + delegate :publish, :subscribe, :instrument, :to => :notifier - def instrumenter - Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscriber - @subscriber ||= Subscriber.new(queue) - end - - def subscribe(pattern=nil, options={}, &block) - with = options[:with] || listener - subscriber.bind(with, pattern).subscribe(&block) + def notifier + @notifier ||= Notifier.new end end - class Instrumenter - def initialize(publisher) - @publisher = publisher - @id = random_id - end - - def transaction - @id, old_id = random_id, @id - yield - ensure - @id = old_id - end - - def transaction_id - @id - end - - def instrument(name, payload={}) - time = Time.now - result = yield if block_given? - ensure - @publisher.publish(name, time, Time.now, result, @id, payload) - end - - private - def random_id - SecureRandom.hex(10) - end - end - - class Publisher - def initialize(queue) + class Notifier + def initialize(queue = Fanout.new) @queue = queue end def publish(*args) @queue.publish(*args) end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - - def bind(listener, pattern) - @listener = listener - @pattern = pattern - self - end - def subscribe - @queue.subscribe(@listener, @pattern) do |*args| - yield(*args) - end + def subscribe(pattern = nil, &block) + @queue.bind(pattern).subscribe(&block) end - end - - class Event - attr_reader :name, :time, :end, :transaction_id, :result, :payload - def initialize(name, start, ending, result, transaction_id, payload) - @name = name - @payload = payload.dup - @time = start - @transaction_id = transaction_id - @end = ending - @result = result + def wait + @queue.wait end - def duration - @duration ||= 1000.0 * (@end - @time) - end + delegate :instrument, :to => :current_instrumenter - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + duration >= event.duration) - end - end - - class AsyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end - - class SyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @subscriber.call(*args.unshift(name)) + private + def current_instrumenter + Thread.current[:"instrumentation_#{object_id}"] ||= Notifications::Instrumenter.new(self) end - end - - def drained? - true - end - end - - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - end - - def publish(*args) - @listeners.each { |l| l.publish(*args) } - end - - def subscribe(listener, pattern=nil, &block) - @listeners << listener.new(pattern, &block) - end - - def drained? - @listeners.all? &:drained? - end end end - - Notifications.queue = Notifications::LittleFanout.new - Notifications.listener = Notifications::AsyncListener end diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb new file mode 100644 index 0000000000..bb07e4765c --- /dev/null +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -0,0 +1,101 @@ +require 'thread' + +module ActiveSupport + module Notifications + # This is a default queue implementation that ships with Notifications. It + # consumes events in a thread and publish them to all registered subscribers. + # + class Fanout + def initialize(sync = false) + @subscriber_klass = sync ? Subscriber : AsyncSubscriber + @subscribers = [] + end + + def bind(pattern) + Binding.new(self, pattern) + end + + def subscribe(pattern = nil, &block) + @subscribers << @subscriber_klass.new(pattern, &block) + end + + def publish(*args) + @subscribers.each { |s| s.publish(*args) } + end + + def wait + sleep(0.05) until @subscribers.all?(&:drained?) + end + + # Used for internal implementation only. + class Binding #:nodoc: + def initialize(queue, pattern) + @queue = queue + @pattern = + case pattern + when Regexp, NilClass + pattern + else + /^#{Regexp.escape(pattern.to_s)}/ + end + end + + def subscribe(&block) + @queue.subscribe(@pattern, &block) + end + end + + class Subscriber #:nodoc: + def initialize(pattern, &block) + @pattern = pattern + @block = block + end + + def publish(*args) + push(*args) if matches?(args.first) + end + + def drained? + true + end + + private + def matches?(name) + !@pattern || @pattern =~ name.to_s + end + + def push(*args) + @block.call(*args) + end + end + + # Used for internal implementation only. + class AsyncSubscriber < Subscriber #:nodoc: + def initialize(pattern, &block) + super + @events = Queue.new + start_consumer + end + + def drained? + @events.empty? + end + + private + def start_consumer + Thread.new { consume } + end + + def consume + while args = @events.shift + @block.call(*args) + end + end + + def push(*args) + @events << args + end + end + end + end +end diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb new file mode 100644 index 0000000000..fb95422af2 --- /dev/null +++ b/activesupport/lib/active_support/notifications/instrumenter.rb @@ -0,0 +1,47 @@ +require 'active_support/secure_random' +require 'active_support/core_ext/module/delegation' + +module ActiveSupport + module Notifications + class Instrumenter + def initialize(notifier) + @id = unique_id + @notifier = notifier + end + + def instrument(name, payload={}) + time = Time.now + result = yield if block_given? + ensure + @notifier.publish(name, time, Time.now, result, @id, payload) + end + + private + def unique_id + SecureRandom.hex(10) + end + end + + class Event + attr_reader :name, :time, :end, :transaction_id, :result, :payload + + def initialize(name, start, ending, result, transaction_id, payload) + @name = name + @payload = payload.dup + @time = start + @transaction_id = transaction_id + @end = ending + @result = result + end + + def duration + @duration ||= 1000.0 * (@end - @time) + end + + def parent_of?(event) + start = (self.time - event.time) * 1000 + start <= 0 && (start + duration >= event.duration) + end + end + end +end diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb index 35d44367cf..4f880d0db7 100644 --- a/activesupport/test/notifications_test.rb +++ b/activesupport/test/notifications_test.rb @@ -1,221 +1,170 @@ require 'abstract_unit' -# Allow LittleFanout to be cleaned. -class ActiveSupport::Notifications::LittleFanout - def clear - @listeners.clear - end -end - -class NotificationsEventTest < Test::Unit::TestCase - def test_events_are_initialized_with_details - event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar) - assert_equal :foo, event.name - assert_equal Hash[:payload => :bar], event.payload - end - - def test_events_consumes_information_given_as_payload - time = Time.now - event = event(:foo, time, time + 0.01, 1, random_id, {}) - - assert_equal Hash.new, event.payload - assert_equal time, event.time - assert_equal 1, event.result - assert_equal 10.0, event.duration - end - - def test_event_is_parent_based_on_time_frame - time = Time.utc(2009, 01, 01, 0, 0, 1) - - parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {}) - child = event(:foo, time, time + 10, nil, random_id, {}) - not_child = event(:foo, time, time + 100, nil, random_id, {}) - - assert parent.parent_of?(child) - assert !child.parent_of?(parent) - assert !parent.parent_of?(not_child) - assert !not_child.parent_of?(parent) - end +module Notifications + class TestCase < ActiveSupport::TestCase + def setup + Thread.abort_on_exception = true + + @notifier = ActiveSupport::Notifications::Notifier.new + @events = [] + @notifier.subscribe { |*args| @events << event(*args) } + end -protected + def teardown + Thread.abort_on_exception = false + end - def random_id - @random_id ||= ActiveSupport::SecureRandom.hex(10) - end + private + def event(*args) + ActiveSupport::Notifications::Event.new(*args) + end - def event(*args) - ActiveSupport::Notifications::Event.new(*args) + def drain + @notifier.wait + end end -end -class NotificationsMainTest < Test::Unit::TestCase - def setup - @events = [] - Thread.abort_on_exception = true - ActiveSupport::Notifications.subscribe do |*args| - @events << ActiveSupport::Notifications::Event.new(*args) + class PubSubTest < TestCase + def test_events_are_published_to_a_listener + @notifier.publish :foo + @notifier.wait + assert_equal [[:foo]], @events end - end - def teardown - Thread.abort_on_exception = false - ActiveSupport::Notifications.queue.clear - end - - def test_notifications_returns_action_result - result = ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - 1 + 1 - end + def test_subscriber_with_pattern + events = [] + @notifier.subscribe('1') { |*args| events << args } - assert_equal 2, result - end + @notifier.publish '1' + @notifier.publish '1.a' + @notifier.publish 'a.1' + @notifier.wait - def test_events_are_published_to_a_listener - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - 1 + 1 + assert_equal [['1'], ['1.a']], events end - drain - - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end + def test_subscriber_with_pattern_as_regexp + events = [] + @notifier.subscribe(/\d/) { |*args| events << args } - def test_nested_events_can_be_instrumented - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - ActiveSupport::Notifications.instrument(:wot, :payload => "child") do - 1 + 1 - end + @notifier.publish '1' + @notifier.publish 'a.1' + @notifier.publish '1.a' + @notifier.wait - drain - - assert_equal 1, @events.size - assert_equal :wot, @events.first.name - assert_equal Hash[:payload => "child"], @events.first.payload + assert_equal [['1'], ['a.1'], ['1.a']], events end - drain + def test_multiple_subscribers + @another = [] + @notifier.subscribe { |*args| @another << args } + @notifier.publish :foo + @notifier.wait - assert_equal 2, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload - end - - def test_event_is_pushed_even_if_block_fails - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") do - raise "OMG" - end rescue RuntimeError - - drain + assert_equal [[:foo]], @events + assert_equal [[:foo]], @another + end - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload + private + def event(*args) + args + end end - def test_event_is_pushed_even_without_block - ActiveSupport::Notifications.instrument(:awesome, :payload => "notifications") - drain + class SyncPubSubTest < PubSubTest + def setup + Thread.abort_on_exception = true - assert_equal 1, @events.size - assert_equal :awesome, @events.last.name - assert_equal Hash[:payload => "notifications"], @events.last.payload + @notifier = ActiveSupport::Notifications::Notifier.new(ActiveSupport::Notifications::Fanout.new(true)) + @events = [] + @notifier.subscribe { |*args| @events << event(*args) } + end end - def test_subscribed_in_a_transaction - @another = [] - - ActiveSupport::Notifications.subscribe("cache") do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + class InstrumentationTest < TestCase + def test_instrument_returns_block_result + assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 } end - ActiveSupport::Notifications.instrument(:cache){ 1 } - ActiveSupport::Notifications.transaction do - ActiveSupport::Notifications.instrument(:cache){ 1 } - end - ActiveSupport::Notifications.instrument(:cache){ 1 } + def test_nested_events_can_be_instrumented + @notifier.instrument(:awesome, :payload => "notifications") do + @notifier.instrument(:wot, :payload => "child") do + 1 + 1 + end - drain + drain - assert_equal 3, @another.size - before, during, after = @another.map {|e| e.transaction_id } - assert_equal before, after - assert_not_equal before, during - end + assert_equal 1, @events.size + assert_equal :wot, @events.first.name + assert_equal Hash[:payload => "child"], @events.first.payload + end - def test_subscriber_with_pattern - @another = [] + drain - ActiveSupport::Notifications.subscribe("cache") do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + assert_equal 2, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload end - ActiveSupport::Notifications.instrument(:cache){ 1 } - - drain + def test_instrument_publishes_when_exception_is_raised + begin + @notifier.instrument(:awesome, :payload => "notifications") do + raise "OMG" + end + flunk + rescue + end - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result - end + drain - def test_subscriber_with_pattern_as_regexp - @another = [] - ActiveSupport::Notifications.subscribe(/cache/) do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload end - ActiveSupport::Notifications.instrument(:something){ 0 } - ActiveSupport::Notifications.instrument(:cache){ 1 } - - drain + def test_event_is_pushed_even_without_block + @notifier.instrument(:awesome, :payload => "notifications") + drain - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result + assert_equal 1, @events.size + assert_equal :awesome, @events.last.name + assert_equal Hash[:payload => "notifications"], @events.last.payload + end end - def test_subscriber_allows_sync_listeners - @another = [] - ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + class EventTest < TestCase + def test_events_are_initialized_with_details + event = event(:foo, Time.now, Time.now + 1, 1, random_id, :payload => :bar) + assert_equal :foo, event.name + assert_equal Hash[:payload => :bar], event.payload end - Thread.expects(:new).never - ActiveSupport::Notifications.instrument(:something){ 0 } - ActiveSupport::Notifications.instrument(:cache){ 1 } + def test_events_consumes_information_given_as_payload + time = Time.now + event = event(:foo, time, time + 0.01, 1, random_id, {}) - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result - end - - def test_with_several_consumers_and_several_events - @another = [] - ActiveSupport::Notifications.subscribe do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) + assert_equal Hash.new, event.payload + assert_equal time, event.time + assert_equal 1, event.result + assert_equal 10.0, event.duration end - 1.upto(100) do |i| - ActiveSupport::Notifications.instrument(:value){ i } - end + def test_event_is_parent_based_on_time_frame + time = Time.utc(2009, 01, 01, 0, 0, 1) - drain + parent = event(:foo, Time.utc(2009), Time.utc(2009) + 100, nil, random_id, {}) + child = event(:foo, time, time + 10, nil, random_id, {}) + not_child = event(:foo, time, time + 100, nil, random_id, {}) - assert_equal 100, @events.size - assert_equal :value, @events.first.name - assert_equal 1, @events.first.result - assert_equal 100, @events.last.result + assert parent.parent_of?(child) + assert !child.parent_of?(parent) + assert !parent.parent_of?(not_child) + assert !not_child.parent_of?(parent) + end - assert_equal 100, @another.size - assert_equal :value, @another.first.name - assert_equal 1, @another.first.result - assert_equal 100, @another.last.result + protected + def random_id + @random_id ||= ActiveSupport::SecureRandom.hex(10) + end end - - private - def drain - sleep(0.05) until ActiveSupport::Notifications.queue.drained? - end end |