aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib
diff options
context:
space:
mode:
Diffstat (limited to 'activesupport/lib')
-rw-r--r--activesupport/lib/active_support/notifications.rb31
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb24
2 files changed, 22 insertions, 33 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index 55041ea2af..b4657a8ba9 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -135,24 +135,9 @@ module ActiveSupport
# to log subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- class Registry # :nodoc:
- def self.instance
- Thread.current[name] ||= new
- end
-
- attr_reader :notifier, :instrumenter
-
- def initialize
- self.notifier = Fanout.new
- end
-
- def notifier=(notifier)
- @notifier = notifier
- @instrumenter = Instrumenter.new(notifier)
- end
- end
-
class << self
+ attr_accessor :notifier
+
def publish(name, *args)
notifier.publish(name, *args)
end
@@ -181,16 +166,10 @@ module ActiveSupport
end
def instrumenter
- Registry.instance.instrumenter
- end
-
- def notifier
- Registry.instance.notifier
- end
-
- def notifier=(notifier)
- Registry.instance.notifier = notifier
+ Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier)
end
end
+
+ self.notifier = Fanout.new
end
end
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 3a2ccf1489..6ffc091233 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -1,26 +1,34 @@
+require 'mutex_m'
+
module ActiveSupport
module Notifications
# This is a default queue implementation that ships with Notifications.
# It just pushes events to all registered log subscribers.
#
- # Only one of these objects should instantiated per thread. Concurrent
- # access to this class is not allowed.
+ # This class is thread safe. All methods are reentrant.
class Fanout
+ include Mutex_m
+
def initialize
@subscribers = []
@listeners_for = {}
+ super
end
def subscribe(pattern = nil, block = Proc.new)
subscriber = Subscribers.new pattern, block
- @subscribers << subscriber
- @listeners_for.clear
+ synchronize do
+ @subscribers << subscriber
+ @listeners_for.clear
+ end
subscriber
end
def unsubscribe(subscriber)
- @subscribers.reject! { |s| s.matches?(subscriber) }
- @listeners_for.clear
+ synchronize do
+ @subscribers.reject! { |s| s.matches?(subscriber) }
+ @listeners_for.clear
+ end
end
def start(name, id, payload)
@@ -36,7 +44,9 @@ module ActiveSupport
end
def listeners_for(name)
- @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ synchronize do
+ @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) }
+ end
end
def listening?(name)