aboutsummaryrefslogblamecommitdiffstats
path: root/activesupport/lib/active_support/notifications.rb
blob: 2cf91fc383cb76176e163cdbc208e7361fe1052a (plain) (tree)
1
2
3
4
5
6
7
8
9
                
                                                   
                                                            
                                      

                    

                                                                            
   
                                                                                 


                             

                                                                                
   

                  
                                                       

                        
   
                                                                                 


                             
                           
                                                             




                                                    
                                                                              

                                  
                                                                 


                               
                                                                                    
                                                                              
   
                      

                         



                                                
                                                                                  













                                                             
                                         

         
                                      

                                      
            
                                                                      
         

       




                           

                             













                           
                                             
                     

           


               
                                                                   
 
                                                                     
                         
                                







                                             
         


                                               
                                                          
         

       
                                                                              


                                                                                 
                    


                              

         

                          

         

                                                   

         
                 

                                                  
           


                    
                             
 


                                       
                            
                                

           


                                                



                   

                                   

             


         
 
                                                       
   
require 'thread'
require 'active_support/core_ext/module/delegation'
require 'active_support/core_ext/module/attribute_accessors'
require 'active_support/secure_random'

module ActiveSupport
  # Notifications provides an instrumentation API for Ruby. To instrument an
  # action in Ruby you just need to do:
  #
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
  #     render :text => "Foo"
  #   end
  #
  # You can consume those events and the information they provide by registering
  # a subscriber. For instance, let's store all instrumented events in an array:
  #
  #   @events = []
  #
  #   ActiveSupport::Notifications.subscribe do |event|
  #     @events << event
  #   end
  #
  #   ActiveSupport::Notifications.instrument(:render, :extra => :information) do
  #     render :text => "Foo"
  #   end
  #
  #   event = @events.first
  #   event.class     #=> ActiveSupport::Notifications::Event
  #   event.name      #=> :render
  #   event.duration  #=> 10 (in miliseconds)
  #   event.result    #=> "Foo"
  #   event.payload   #=> { :extra => :information }
  #
  # When subscribing to Notifications, you can pass a pattern, to only consume
  # events that match the pattern:
  #
  #   ActiveSupport::Notifications.subscribe(/render/) do |event|
  #     @render_events << event
  #   end
  #
  # Notifications ships with a queue implementation that consumes and publish events
  # to subscribers in a thread. You can use any queue implementation you want.
  #
  module Notifications
    mattr_accessor :queue

    class << self
      delegate :instrument, :to => :instrumenter

      def instrumenter
        Thread.current[:notifications_instrumeter] ||= Instrumenter.new(publisher)
      end

      def publisher
        @publisher ||= Publisher.new(queue)
      end

      def subscribe(pattern=nil, &block)
        Subscriber.new(queue).bind(pattern).subscribe(&block)
      end
    end

    class Instrumenter
      def initialize(publisher)
        @publisher = publisher
        @id        = SecureRandom.hex(10)
      end

      def instrument(name, payload={})
        time   = Time.now
        result = yield if block_given?
      ensure
        @publisher.publish(name, time, Time.now, result, @id, payload)
      end
    end

    class Publisher
      def initialize(queue)
        @queue = queue
      end

      def publish(*args)
        @queue.publish(*args)
      end
    end

    class Subscriber
      def initialize(queue)
        @queue = queue
      end

      def bind(pattern)
        @pattern = pattern
        self
      end

      def subscribe
        @queue.subscribe(@pattern) do |*args|
          yield *args
        end
      end
    end

    class Event
      attr_reader :name, :time, :end, :thread_id, :result, :payload

      def initialize(name, start, ending, result, thread_id, payload)
        @name      = name
        @payload   = payload.dup
        @time      = start
        @thread_id = thread_id
        @end       = ending
        @result    = result
      end

      def duration
        @duration ||= 1000.0 * (@end - @time)
      end

      def parent_of?(event)
        start = (self.time - event.time) * 1000
        start <= 0 && (start + duration >= event.duration)
      end
    end

    # This is a default queue implementation that ships with Notifications. It
    # consumes events in a thread and publish them to all registered subscribers.
    #
    class LittleFanout
      def initialize
        @listeners = []
        @stream    = Queue.new
        Thread.new { consume }
      end

      def publish(*args)
        @stream.push(args)
      end

      def subscribe(pattern=nil, &block)
        @listeners << Listener.new(pattern, &block)
      end

      def consume
        while args = @stream.shift
          @listeners.each { |l| l.publish(*args) }
        end
      end

      class Listener
        # attr_reader :thread

        def initialize(pattern, &block)
          @pattern = pattern
          @subscriber = block
          @queue = Queue.new
          Thread.new { consume }
        end

        def publish(name, *args)
          if !@pattern || @pattern === name.to_s
            @queue << args.unshift(name)
          end
        end

        def consume
          while args = @queue.shift
            @subscriber.call(*args)
          end
        end
      end
    end
  end

  Notifications.queue = Notifications::LittleFanout.new
end