diff options
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 118 |
1 files changed, 101 insertions, 17 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index a9aa5464e9..2e5bcf4639 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -1,24 +1,42 @@ +require 'mutex_m' + module ActiveSupport module Notifications # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. + # + # This class is thread safe. All methods are reentrant. class Fanout + include Mutex_m + def initialize @subscribers = [] @listeners_for = {} + super end def subscribe(pattern = nil, block = Proc.new) - subscriber = Subscriber.new(pattern, block).tap do |s| - @subscribers << s + subscriber = Subscribers.new pattern, block + synchronize do + @subscribers << subscriber + @listeners_for.clear end - @listeners_for.clear subscriber end def unsubscribe(subscriber) - @subscribers.reject! {|s| s.matches?(subscriber)} - @listeners_for.clear + synchronize do + @subscribers.reject! { |s| s.matches?(subscriber) } + @listeners_for.clear + end + end + + def start(name, id, payload) + listeners_for(name).each { |s| s.start(name, id, payload) } + end + + def finish(name, id, payload) + listeners_for(name).each { |s| s.finish(name, id, payload) } end def publish(name, *args) @@ -26,7 +44,9 @@ module ActiveSupport end def listeners_for(name) - @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + synchronize do + @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + end end def listening?(name) @@ -37,23 +57,87 @@ module ActiveSupport def wait end - class Subscriber #:nodoc: - def initialize(pattern, delegate) - @pattern = pattern - @delegate = delegate + module Subscribers # :nodoc: + def self.new(pattern, listener) + if listener.respond_to?(:start) and listener.respond_to?(:finish) + subscriber = Evented.new pattern, listener + else + subscriber = Timed.new pattern, listener + end + + unless pattern + AllMessages.new(subscriber) + else + subscriber + end end - def publish(message, *args) - @delegate.call(message, *args) + class Evented #:nodoc: + def initialize(pattern, delegate) + @pattern = pattern + @delegate = delegate + end + + def start(name, id, payload) + @delegate.start name, id, payload + end + + def finish(name, id, payload) + @delegate.finish name, id, payload + end + + def subscribed_to?(name) + @pattern === name.to_s + end + + def matches?(subscriber_or_name) + self === subscriber_or_name || + @pattern && @pattern === subscriber_or_name + end end - def subscribed_to?(name) - !@pattern || @pattern === name.to_s + class Timed < Evented + def initialize(pattern, delegate) + @timestack = [] + super + end + + def publish(name, *args) + @delegate.call name, *args + end + + def start(name, id, payload) + @timestack.push Time.now + end + + def finish(name, id, payload) + started = @timestack.pop + @delegate.call(name, started, Time.now, id, payload) + end end - def matches?(subscriber_or_name) - self === subscriber_or_name || - @pattern && @pattern === subscriber_or_name + class AllMessages # :nodoc: + def initialize(delegate) + @delegate = delegate + end + + def start(name, id, payload) + @delegate.start name, id, payload + end + + def finish(name, id, payload) + @delegate.finish name, id, payload + end + + def publish(name, *args) + @delegate.publish name, *args + end + + def subscribed_to?(name) + true + end + + alias :matches? :=== end end end |