From 6f7fc5824f2033c0f674b002dbee7f1c3f3384ac Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Tue, 24 Nov 2009 19:26:13 -0800 Subject: Revert "Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine." Take a step back on this API direction. This reverts commit 8104f65c3225453d13307c3c2733c2a8f99e491a. --- activesupport/lib/active_support/notifications.rb | 92 +++++++++-------------- activesupport/test/notifications_test.rb | 17 +---- 2 files changed, 35 insertions(+), 74 deletions(-) diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index e2540cd598..7a9f76b26a 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -41,7 +41,7 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue, :listener + mattr_accessor :queue class << self delegate :instrument, :transaction_id, :transaction, :to => :instrumenter @@ -54,13 +54,8 @@ module ActiveSupport @publisher ||= Publisher.new(queue) end - def subscriber - @subscriber ||= Subscriber.new(queue) - end - - def subscribe(pattern=nil, options={}, &block) - with = options[:with] || listener - subscriber.bind(with, pattern).subscribe(&block) + def subscribe(pattern=nil, &block) + Subscriber.new(queue).bind(pattern).subscribe(&block) end end @@ -109,14 +104,13 @@ module ActiveSupport @queue = queue end - def bind(listener, pattern) - @listener = listener - @pattern = pattern + def bind(pattern) + @pattern = pattern self end def subscribe - @queue.subscribe(@listener, @pattern) do |*args| + @queue.subscribe(@pattern) do |*args| yield(*args) end end @@ -144,48 +138,6 @@ module ActiveSupport end end - class AsyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end - - class SyncListener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @subscriber.call(*args.unshift(name)) - end - end - - def drained? - true - end - end - # This is a default queue implementation that ships with Notifications. It # consumes events in a thread and publish them to all registered subscribers. # @@ -198,16 +150,40 @@ module ActiveSupport @listeners.each { |l| l.publish(*args) } end - def subscribe(listener, pattern=nil, &block) - @listeners << listener.new(pattern, &block) + def subscribe(pattern=nil, &block) + @listeners << Listener.new(pattern, &block) end def drained? @listeners.all? &:drained? end + + class Listener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new + Thread.new { consume } + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @queue << args.unshift(name) + end + end + + def consume + while args = @queue.shift + @subscriber.call(*args) + end + end + + def drained? + @queue.size.zero? + end + end end end - Notifications.queue = Notifications::LittleFanout.new - Notifications.listener = Notifications::AsyncListener + Notifications.queue = Notifications::LittleFanout.new end diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb index 35d44367cf..01106e83e9 100644 --- a/activesupport/test/notifications_test.rb +++ b/activesupport/test/notifications_test.rb @@ -176,21 +176,6 @@ class NotificationsMainTest < Test::Unit::TestCase assert_equal 1, @another.first.result end - def test_subscriber_allows_sync_listeners - @another = [] - ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args| - @another << ActiveSupport::Notifications::Event.new(*args) - end - - Thread.expects(:new).never - ActiveSupport::Notifications.instrument(:something){ 0 } - ActiveSupport::Notifications.instrument(:cache){ 1 } - - assert_equal 1, @another.size - assert_equal :cache, @another.first.name - assert_equal 1, @another.first.result - end - def test_with_several_consumers_and_several_events @another = [] ActiveSupport::Notifications.subscribe do |*args| @@ -216,6 +201,6 @@ class NotificationsMainTest < Test::Unit::TestCase private def drain - sleep(0.05) until ActiveSupport::Notifications.queue.drained? + sleep(0.1) until ActiveSupport::Notifications.queue.drained? end end -- cgit v1.2.3