diff options
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 63 |
1 files changed, 56 insertions, 7 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 11721db103..c37bec4ee5 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -2,6 +2,7 @@ require "mutex_m" require "concurrent/map" +require "set" module ActiveSupport module Notifications @@ -19,8 +20,8 @@ module ActiveSupport super end - def subscribe(pattern = nil, block = Proc.new) - subscriber = Subscribers.new pattern, block + def subscribe(pattern = nil, callable = nil, monotonic = false, &block) + subscriber = Subscribers.new(monotonic, pattern, callable || block) synchronize do if String === pattern @string_subscribers[pattern] << subscriber @@ -39,6 +40,7 @@ module ActiveSupport when String @string_subscribers[subscriber_or_name].clear @listeners_for.delete(subscriber_or_name) + @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) } else pattern = subscriber_or_name.try(:pattern) if String === pattern @@ -82,8 +84,8 @@ module ActiveSupport end module Subscribers # :nodoc: - def self.new(pattern, listener) - subscriber_class = Timed + def self.new(monotonic, pattern, listener) + subscriber_class = monotonic ? MonotonicTimed : Timed if listener.respond_to?(:start) && listener.respond_to?(:finish) subscriber_class = Evented @@ -113,11 +115,33 @@ module ActiveSupport end end + class Matcher #:nodoc: + attr_reader :pattern, :exclusions + + def self.wrap(pattern) + return pattern if String === pattern + new(pattern) + end + + def initialize(pattern) + @pattern = pattern + @exclusions = Set.new + end + + def unsubscribe!(name) + exclusions << -name if pattern === name + end + + def ===(name) + pattern === name && !exclusions.include?(name) + end + end + class Evented #:nodoc: attr_reader :pattern def initialize(pattern, delegate) - @pattern = pattern + @pattern = Matcher.wrap(pattern) @delegate = delegate @can_publish = delegate.respond_to?(:publish) end @@ -137,11 +161,15 @@ module ActiveSupport end def subscribed_to?(name) - @pattern === name + pattern === name end def matches?(name) - @pattern && @pattern === name + pattern && pattern === name + end + + def unsubscribe!(name) + pattern.unsubscribe!(name) end end @@ -162,6 +190,23 @@ module ActiveSupport end end + class MonotonicTimed < Evented # :nodoc: + def publish(name, *args) + @delegate.call name, *args + end + + def start(name, id, payload) + timestack = Thread.current[:_timestack_monotonic] ||= [] + timestack.push Concurrent.monotonic_time + end + + def finish(name, id, payload) + timestack = Thread.current[:_timestack_monotonic] + started = timestack.pop + @delegate.call(name, started, Concurrent.monotonic_time, id, payload) + end + end + class EventObject < Evented def start(name, id, payload) stack = Thread.current[:_event_stack] ||= [] @@ -204,6 +249,10 @@ module ActiveSupport true end + def unsubscribe!(*) + false + end + alias :matches? :=== end end |