diff options
author | Aaron Patterson <aaron.patterson@gmail.com> | 2012-06-19 15:33:14 -0700 |
---|---|---|
committer | Aaron Patterson <aaron.patterson@gmail.com> | 2012-06-19 15:33:14 -0700 |
commit | bf8e20586b131cd03537556f6494ec3195e8aeec (patch) | |
tree | 8a455326b34815384a4f53305909ee6a0a99cd92 | |
parent | ceba010ea254e987eb266e31c55f45fe51b80713 (diff) | |
download | rails-bf8e20586b131cd03537556f6494ec3195e8aeec.tar.gz rails-bf8e20586b131cd03537556f6494ec3195e8aeec.tar.bz2 rails-bf8e20586b131cd03537556f6494ec3195e8aeec.zip |
move fanout back to a global variable, add a mutex for safety
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 31 | ||||
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 24 |
2 files changed, 22 insertions, 33 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 55041ea2af..b4657a8ba9 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -135,24 +135,9 @@ module ActiveSupport # to log subscribers in a thread. You can use any queue implementation you want. # module Notifications - class Registry # :nodoc: - def self.instance - Thread.current[name] ||= new - end - - attr_reader :notifier, :instrumenter - - def initialize - self.notifier = Fanout.new - end - - def notifier=(notifier) - @notifier = notifier - @instrumenter = Instrumenter.new(notifier) - end - end - class << self + attr_accessor :notifier + def publish(name, *args) notifier.publish(name, *args) end @@ -181,16 +166,10 @@ module ActiveSupport end def instrumenter - Registry.instance.instrumenter - end - - def notifier - Registry.instance.notifier - end - - def notifier=(notifier) - Registry.instance.notifier = notifier + Thread.current[:"instrumentation_#{notifier.object_id}"] ||= Instrumenter.new(notifier) end end + + self.notifier = Fanout.new end end diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 3a2ccf1489..6ffc091233 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -1,26 +1,34 @@ +require 'mutex_m' + module ActiveSupport module Notifications # This is a default queue implementation that ships with Notifications. # It just pushes events to all registered log subscribers. # - # Only one of these objects should instantiated per thread. Concurrent - # access to this class is not allowed. + # This class is thread safe. All methods are reentrant. class Fanout + include Mutex_m + def initialize @subscribers = [] @listeners_for = {} + super end def subscribe(pattern = nil, block = Proc.new) subscriber = Subscribers.new pattern, block - @subscribers << subscriber - @listeners_for.clear + synchronize do + @subscribers << subscriber + @listeners_for.clear + end subscriber end def unsubscribe(subscriber) - @subscribers.reject! { |s| s.matches?(subscriber) } - @listeners_for.clear + synchronize do + @subscribers.reject! { |s| s.matches?(subscriber) } + @listeners_for.clear + end end def start(name, id, payload) @@ -36,7 +44,9 @@ module ActiveSupport end def listeners_for(name) - @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + synchronize do + @listeners_for[name] ||= @subscribers.select { |s| s.subscribed_to?(name) } + end end def listening?(name) |