From 2d7abe245e7a2b1717e48ef550e4083318fd7ec2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Thu, 15 Oct 2009 18:51:51 -0300 Subject: Renamed Orchestra to Notifications once again [#3321 state:resolved] --- activesupport/lib/active_support/autoload.rb | 2 +- activesupport/lib/active_support/cache.rb | 2 +- activesupport/lib/active_support/notifications.rb | 171 ++++++++++++++++++++++ activesupport/lib/active_support/orchestra.rb | 171 ---------------------- 4 files changed, 173 insertions(+), 173 deletions(-) create mode 100644 activesupport/lib/active_support/notifications.rb delete mode 100644 activesupport/lib/active_support/orchestra.rb (limited to 'activesupport/lib/active_support') diff --git a/activesupport/lib/active_support/autoload.rb b/activesupport/lib/active_support/autoload.rb index fa657ac99a..63f7338a68 100644 --- a/activesupport/lib/active_support/autoload.rb +++ b/activesupport/lib/active_support/autoload.rb @@ -18,9 +18,9 @@ module ActiveSupport autoload :MessageVerifier, 'active_support/message_verifier' autoload :Multibyte, 'active_support/multibyte' autoload :OptionMerger, 'active_support/option_merger' - autoload :Orchestra, 'active_support/orchestra' autoload :OrderedHash, 'active_support/ordered_hash' autoload :OrderedOptions, 'active_support/ordered_options' + autoload :Notifications, 'active_support/notifications' autoload :Rescuable, 'active_support/rescuable' autoload :SecureRandom, 'active_support/secure_random' autoload :StringInquirer, 'active_support/string_inquirer' diff --git a/activesupport/lib/active_support/cache.rb b/activesupport/lib/active_support/cache.rb index a2717499e7..818983fdd6 100644 --- a/activesupport/lib/active_support/cache.rb +++ b/activesupport/lib/active_support/cache.rb @@ -256,7 +256,7 @@ module ActiveSupport if self.class.instrument payload = { :key => key } payload.merge!(options) if options.is_a?(Hash) - ActiveSupport::Orchestra.instrument(:"cache_#{operation}", payload, &block) + ActiveSupport::Notifications.instrument(:"cache_#{operation}", payload, &block) else yield end diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb new file mode 100644 index 0000000000..7e9ffca13f --- /dev/null +++ b/activesupport/lib/active_support/notifications.rb @@ -0,0 +1,171 @@ +require 'thread' +require 'active_support/core_ext/module/delegation' +require 'active_support/core_ext/module/attribute_accessors' + +module ActiveSupport + # Notifications provides an instrumentation API for Ruby. To instrument an + # action in Ruby you just need to do: + # + # ActiveSupport::Notifications.instrument(:render, :extra => :information) do + # render :text => "Foo" + # end + # + # You can consume those events and the information they provide by registering + # a subscriber. For instance, let's store all instrumented events in an array: + # + # @events = [] + # + # ActiveSupport::Notifications.subscribe do |event| + # @events << event + # end + # + # ActiveSupport::Notifications.instrument(:render, :extra => :information) do + # render :text => "Foo" + # end + # + # event = @events.first + # event.class #=> ActiveSupport::Notifications::Event + # event.name #=> :render + # event.duration #=> 10 (in miliseconds) + # event.result #=> "Foo" + # event.payload #=> { :extra => :information } + # + # When subscribing to Notifications, you can pass a pattern, to only consume + # events that match the pattern: + # + # ActiveSupport::Notifications.subscribe(/render/) do |event| + # @render_events << event + # end + # + # Notifications ships with a queue implementation that consumes and publish events + # to subscribers in a thread. You can use any queue implementation you want. + # + module Notifications + mattr_accessor :queue + + class << self + delegate :instrument, :to => :instrumenter + + def instrumenter + Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher) + end + + def publisher + @publisher ||= Publisher.new(queue) + end + + def subscribe(pattern=nil, &block) + Subscriber.new(queue).bind(pattern).subscribe(&block) + end + end + + class Instrumenter + def initialize(publisher) + @publisher = publisher + end + + def instrument(name, payload={}) + payload[:time] = Time.now + payload[:thread_id] = Thread.current.object_id + payload[:result] = yield if block_given? + ensure + payload[:duration] = 1000 * (Time.now.to_f - payload[:time].to_f) + @publisher.publish(name, payload) + end + end + + class Publisher + def initialize(queue) + @queue = queue + end + + def publish(name, payload) + @queue.publish(name, payload) + end + end + + class Subscriber + def initialize(queue) + @queue = queue + end + + def bind(pattern) + @pattern = pattern + self + end + + def subscribe + @queue.subscribe(@pattern) do |name, payload| + yield Event.new(name, payload) + end + end + end + + class Event + attr_reader :name, :time, :duration, :thread_id, :result, :payload + + def initialize(name, payload) + @name = name + @payload = payload.dup + @time = @payload.delete(:time) + @thread_id = @payload.delete(:thread_id) + @result = @payload.delete(:result) + @duration = @payload.delete(:duration) + end + + def parent_of?(event) + start = (self.time - event.time) * 1000 + start <= 0 && (start + self.duration >= event.duration) + end + end + + # This is a default queue implementation that ships with Notifications. It + # consumes events in a thread and publish them to all registered subscribers. + # + class LittleFanout + def initialize + @listeners, @stream = [], Queue.new + @thread = Thread.new { consume } + end + + def publish(*event) + @stream.push(event) + end + + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) + end + + def consume + while event = @stream.shift + @listeners.each { |l| l.publish(*event) } + end + end + + class Listener + attr_reader :thread + + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new + @thread = Thread.new { consume } + end + + def publish(name, payload) + unless @pattern && !(@pattern === name.to_s) + @queue << [name, payload] + end + end + + def consume + while event = @queue.shift + @subscriber.call(*event) + end + end + end + end + end + + Notifications.queue = Notifications::LittleFanout.new +end diff --git a/activesupport/lib/active_support/orchestra.rb b/activesupport/lib/active_support/orchestra.rb deleted file mode 100644 index 5f57127401..0000000000 --- a/activesupport/lib/active_support/orchestra.rb +++ /dev/null @@ -1,171 +0,0 @@ -require 'thread' -require 'active_support/core_ext/module/delegation' -require 'active_support/core_ext/module/attribute_accessors' - -module ActiveSupport - # Orchestra provides an instrumentation API for Ruby. To instrument an action - # in Ruby you just need to do: - # - # ActiveSupport::Orchestra.instrument(:render, :extra => :information) do - # render :text => "Foo" - # end - # - # You can consume those events and the information they provide by registering - # a subscriber. For instance, let's store all instrumented events in an array: - # - # @events = [] - # - # ActiveSupport::Orchestra.subscribe do |event| - # @events << event - # end - # - # ActiveSupport::Orchestra.instrument(:render, :extra => :information) do - # render :text => "Foo" - # end - # - # event = @events.first - # event.class #=> ActiveSupport::Orchestra::Event - # event.name #=> :render - # event.duration #=> 10 (in miliseconds) - # event.result #=> "Foo" - # event.payload #=> { :extra => :information } - # - # When subscribing to Orchestra, you can pass a pattern, to only consume - # events that match the pattern: - # - # ActiveSupport::Orchestra.subscribe(/render/) do |event| - # @render_events << event - # end - # - # Orchestra ships with a queue implementation that consumes and publish events - # to subscribers in a thread. You can use any queue implementation you want. - # - module Orchestra - mattr_accessor :queue - - class << self - delegate :instrument, :to => :instrumenter - - def instrumenter - Thread.current[:orchestra_instrumeter] ||= Instrumenter.new(publisher) - end - - def publisher - @publisher ||= Publisher.new(queue) - end - - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) - end - end - - class Instrumenter - def initialize(publisher) - @publisher = publisher - end - - def instrument(name, payload={}) - payload[:time] = Time.now - payload[:thread_id] = Thread.current.object_id - payload[:result] = yield if block_given? - ensure - payload[:duration] = 1000 * (Time.now.to_f - payload[:time].to_f) - @publisher.publish(name, payload) - end - end - - class Publisher - def initialize(queue) - @queue = queue - end - - def publish(name, payload) - @queue.publish(name, payload) - end - end - - class Subscriber - def initialize(queue) - @queue = queue - end - - def bind(pattern) - @pattern = pattern - self - end - - def subscribe - @queue.subscribe(@pattern) do |name, payload| - yield Event.new(name, payload) - end - end - end - - class Event - attr_reader :name, :time, :duration, :thread_id, :result, :payload - - def initialize(name, payload) - @name = name - @payload = payload.dup - @time = @payload.delete(:time) - @thread_id = @payload.delete(:thread_id) - @result = @payload.delete(:result) - @duration = @payload.delete(:duration) - end - - def parent_of?(event) - start = (self.time - event.time) * 1000 - start <= 0 && (start + self.duration >= event.duration) - end - end - - # This is a default queue implementation that ships with Orchestra. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners, @stream = [], Queue.new - @thread = Thread.new { consume } - end - - def publish(*event) - @stream.push(event) - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) - end - - def consume - while event = @stream.shift - @listeners.each { |l| l.publish(*event) } - end - end - - class Listener - attr_reader :thread - - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - @thread = Thread.new { consume } - end - - def publish(name, payload) - unless @pattern && !(@pattern === name.to_s) - @queue << [name, payload] - end - end - - def consume - while event = @queue.shift - @subscriber.call(*event) - end - end - end - end - end - - Orchestra.queue = Orchestra::LittleFanout.new -end -- cgit v1.2.3