diff options
Diffstat (limited to 'activesupport')
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 92 | ||||
-rw-r--r-- | activesupport/test/notifications_test.rb | 17 |
2 files changed, 74 insertions, 35 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 7a9f76b26a..e2540cd598 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -41,7 +41,7 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + mattr_accessor :queue, :listener class << self delegate :instrument, :transaction_id, :transaction, :to => :instrumenter @@ -54,8 +54,13 @@ module ActiveSupport @publisher ||= Publisher.new(queue) end - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def subscriber + @subscriber ||= Subscriber.new(queue) + end + + def subscribe(pattern=nil, options={}, &block) + with = options[:with] || listener + subscriber.bind(with, pattern).subscribe(&block) end end @@ -104,13 +109,14 @@ module ActiveSupport @queue = queue end - def bind(pattern) - @pattern = pattern + def bind(listener, pattern) + @listener = listener + @pattern = pattern self end def subscribe - @queue.subscribe(@pattern) do |*args| + @queue.subscribe(@listener, @pattern) do |*args| yield(*args) end end @@ -138,6 +144,48 @@ module ActiveSupport end end + class AsyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new + Thread.new { consume } + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @queue << args.unshift(name) + end + end + + def consume + while args = @queue.shift + @subscriber.call(*args) + end + end + + def drained? + @queue.size.zero? + end + end + + class SyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @subscriber.call(*args.unshift(name)) + end + end + + def drained? + true + end + end + # This is a default queue implementation that ships with Notifications. It # consumes events in a thread and publish them to all registered subscribers. # @@ -150,40 +198,16 @@ module ActiveSupport @listeners.each { |l| l.publish(*args) } end - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def subscribe(listener, pattern=nil, &block) + @listeners << listener.new(pattern, &block) end def drained? @listeners.all? &:drained? end - - class Listener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end end end - Notifications.queue = Notifications::LittleFanout.new + Notifications.queue = Notifications::LittleFanout.new + Notifications.listener = Notifications::AsyncListener end diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb index 01106e83e9..35d44367cf 100644 --- a/activesupport/test/notifications_test.rb +++ b/activesupport/test/notifications_test.rb @@ -176,6 +176,21 @@ class NotificationsMainTest < Test::Unit::TestCase assert_equal 1, @another.first.result end + def test_subscriber_allows_sync_listeners + @another = [] + ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args| + @another << ActiveSupport::Notifications::Event.new(*args) + end + + Thread.expects(:new).never + ActiveSupport::Notifications.instrument(:something){ 0 } + ActiveSupport::Notifications.instrument(:cache){ 1 } + + assert_equal 1, @another.size + assert_equal :cache, @another.first.name + assert_equal 1, @another.first.result + end + def test_with_several_consumers_and_several_events @another = [] ActiveSupport::Notifications.subscribe do |*args| @@ -201,6 +216,6 @@ class NotificationsMainTest < Test::Unit::TestCase private def drain - sleep(0.1) until ActiveSupport::Notifications.queue.drained? + sleep(0.05) until ActiveSupport::Notifications.queue.drained? end end |