aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications/fanout.rb
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb73
1 files changed, 60 insertions, 13 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 4e4ca70942..c506b35b1e 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -2,6 +2,7 @@
require "mutex_m"
require "concurrent/map"
+require "set"
module ActiveSupport
module Notifications
@@ -13,16 +14,22 @@ module ActiveSupport
include Mutex_m
def initialize
- @subscribers = []
+ @string_subscribers = Hash.new { |h, k| h[k] = [] }
+ @other_subscribers = []
@listeners_for = Concurrent::Map.new
super
end
- def subscribe(pattern = nil, block = Proc.new)
- subscriber = Subscribers.new pattern, block
+ def subscribe(pattern = nil, callable = nil, &block)
+ subscriber = Subscribers.new(pattern, callable || block)
synchronize do
- @subscribers << subscriber
- @listeners_for.clear
+ if String === pattern
+ @string_subscribers[pattern] << subscriber
+ @listeners_for.delete(pattern)
+ else
+ @other_subscribers << subscriber
+ @listeners_for.clear
+ end
end
subscriber
end
@@ -31,12 +38,19 @@ module ActiveSupport
synchronize do
case subscriber_or_name
when String
- @subscribers.reject! { |s| s.matches?(subscriber_or_name) }
+ @string_subscribers[subscriber_or_name].clear
+ @listeners_for.delete(subscriber_or_name)
+ @other_subscribers.each { |sub| sub.unsubscribe!(subscriber_or_name) }
else
- @subscribers.delete(subscriber_or_name)
+ pattern = subscriber_or_name.try(:pattern)
+ if String === pattern
+ @string_subscribers[pattern].delete(subscriber_or_name)
+ @listeners_for.delete(pattern)
+ else
+ @other_subscribers.delete(subscriber_or_name)
+ @listeners_for.clear
+ end
end
-
- @listeners_for.clear
end
end
@@ -56,7 +70,8 @@ module ActiveSupport
# this is correctly done double-checked locking (Concurrent::Map's lookups have volatile semantics)
@listeners_for[name] || synchronize do
# use synchronisation when accessing @subscribers
- @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ @listeners_for[name] ||=
+ @string_subscribers[name] + @other_subscribers.select { |s| s.subscribed_to?(name) }
end
end
@@ -100,9 +115,33 @@ module ActiveSupport
end
end
+ class Matcher #:nodoc:
+ attr_reader :pattern, :exclusions
+
+ def self.wrap(pattern)
+ return pattern if String === pattern
+ new(pattern)
+ end
+
+ def initialize(pattern)
+ @pattern = pattern
+ @exclusions = Set.new
+ end
+
+ def unsubscribe!(name)
+ exclusions << -name if pattern === name
+ end
+
+ def ===(name)
+ pattern === name && !exclusions.include?(name)
+ end
+ end
+
class Evented #:nodoc:
+ attr_reader :pattern
+
def initialize(pattern, delegate)
- @pattern = pattern
+ @pattern = Matcher.wrap(pattern)
@delegate = delegate
@can_publish = delegate.respond_to?(:publish)
end
@@ -122,11 +161,15 @@ module ActiveSupport
end
def subscribed_to?(name)
- @pattern === name
+ pattern === name
end
def matches?(name)
- @pattern && @pattern === name
+ pattern && pattern === name
+ end
+
+ def unsubscribe!(name)
+ pattern.unsubscribe!(name)
end
end
@@ -189,6 +232,10 @@ module ActiveSupport
true
end
+ def unsubscribe!(*)
+ false
+ end
+
alias :matches? :===
end
end