aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications/fanout.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb63
1 files changed, 56 insertions, 7 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 11721db103..c37bec4ee5 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -2,6 +2,7 @@
require "mutex_m"
require "concurrent/map"
+require "set"
module ActiveSupport
module Notifications
@@ -19,8 +20,8 @@ module ActiveSupport
super
end
- def subscribe(pattern = nil, block = Proc.new)
- subscriber = Subscribers.new pattern, block
+ def subscribe(pattern = nil, callable = nil, monotonic = false, &block)
+ subscriber = Subscribers.new(monotonic, pattern, callable || block)
synchronize do
if String === pattern
@string_subscribers[pattern] << subscriber
@@ -39,6 +40,7 @@ module ActiveSupport
when String
@string_subscribers[subscriber_or_name].clear
@listeners_for.delete(subscriber_or_name)
+ @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
pattern = subscriber_or_name.try(:pattern)
if String === pattern
@@ -82,8 +84,8 @@ module ActiveSupport
end
module Subscribers # :nodoc:
- def self.new(pattern, listener)
- subscriber_class = Timed
+ def self.new(monotonic, pattern, listener)
+ subscriber_class = monotonic ? MonotonicTimed : Timed
if listener.respond_to?(:start) && listener.respond_to?(:finish)
subscriber_class = Evented
@@ -113,11 +115,33 @@ module ActiveSupport
end
end
+ class Matcher #:nodoc:
+ attr_reader :pattern, :exclusions
+
+ def self.wrap(pattern)
+ return pattern if String === pattern
+ new(pattern)
+ end
+
+ def initialize(pattern)
+ @pattern = pattern
+ @exclusions = Set.new
+ end
+
+ def unsubscribe!(name)
+ exclusions << -name if pattern === name
+ end
+
+ def ===(name)
+ pattern === name && !exclusions.include?(name)
+ end
+ end
+
class Evented #:nodoc:
attr_reader :pattern
def initialize(pattern, delegate)
- @pattern = pattern
+ @pattern = Matcher.wrap(pattern)
@delegate = delegate
@can_publish = delegate.respond_to?(:publish)
end
@@ -137,11 +161,15 @@ module ActiveSupport
end
def subscribed_to?(name)
- @pattern === name
+ pattern === name
end
def matches?(name)
- @pattern && @pattern === name
+ pattern && pattern === name
+ end
+
+ def unsubscribe!(name)
+ pattern.unsubscribe!(name)
end
end
@@ -162,6 +190,23 @@ module ActiveSupport
end
end
+ class MonotonicTimed < Evented # :nodoc:
+ def publish(name, *args)
+ @delegate.call name, *args
+ end
+
+ def start(name, id, payload)
+ timestack = Thread.current[:_timestack_monotonic] ||= []
+ timestack.push Concurrent.monotonic_time
+ end
+
+ def finish(name, id, payload)
+ timestack = Thread.current[:_timestack_monotonic]
+ started = timestack.pop
+ @delegate.call(name, started, Concurrent.monotonic_time, id, payload)
+ end
+ end
+
class EventObject < Evented
def start(name, id, payload)
stack = Thread.current[:_event_stack] ||= []
@@ -204,6 +249,10 @@ module ActiveSupport
true
end
+ def unsubscribe!(*)
+ false
+ end
+
alias :matches? :===
end
end