From 6f7fc5824f2033c0f674b002dbee7f1c3f3384ac Mon Sep 17 00:00:00 2001
From: Jeremy Kemper <jeremy@bitsweat.net>
Date: Tue, 24 Nov 2009 19:26:13 -0800
Subject: Revert "Create SyncListener. Since they do not rely on Thread, they
 can be used on Google App Engine."

Take a step back on this API direction.

This reverts commit 8104f65c3225453d13307c3c2733c2a8f99e491a.
---
 activesupport/lib/active_support/notifications.rb | 92 +++++++++--------------
 activesupport/test/notifications_test.rb          | 17 +----
 2 files changed, 35 insertions(+), 74 deletions(-)

(limited to 'activesupport')

diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index e2540cd598..7a9f76b26a 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -41,7 +41,7 @@ module ActiveSupport
   # to subscribers in a thread. You can use any queue implementation you want.
   #
   module Notifications
-    mattr_accessor :queue, :listener
+    mattr_accessor :queue
 
     class << self
       delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,13 +54,8 @@ module ActiveSupport
         @publisher ||= Publisher.new(queue)
       end
 
-      def subscriber
-        @subscriber ||= Subscriber.new(queue)
-      end
-
-      def subscribe(pattern=nil, options={}, &block)
-        with = options[:with] || listener
-        subscriber.bind(with, pattern).subscribe(&block)
+      def subscribe(pattern=nil, &block)
+        Subscriber.new(queue).bind(pattern).subscribe(&block)
       end
     end
 
@@ -109,14 +104,13 @@ module ActiveSupport
         @queue = queue
       end
 
-      def bind(listener, pattern)
-        @listener = listener
-        @pattern  = pattern
+      def bind(pattern)
+        @pattern = pattern
         self
       end
 
       def subscribe
-        @queue.subscribe(@listener, @pattern) do |*args|
+        @queue.subscribe(@pattern) do |*args|
           yield(*args)
         end
       end
@@ -144,48 +138,6 @@ module ActiveSupport
       end
     end
 
-    class AsyncListener
-      def initialize(pattern, &block)
-        @pattern = pattern
-        @subscriber = block
-        @queue = Queue.new
-        Thread.new { consume }
-      end
-
-      def publish(name, *args)
-        if !@pattern || @pattern === name.to_s
-          @queue << args.unshift(name)
-        end
-      end
-
-      def consume
-        while args = @queue.shift
-          @subscriber.call(*args)
-        end
-      end
-
-      def drained?
-        @queue.size.zero?
-      end
-    end
-
-    class SyncListener
-      def initialize(pattern, &block)
-        @pattern = pattern
-        @subscriber = block
-      end
-
-      def publish(name, *args)
-        if !@pattern || @pattern === name.to_s
-          @subscriber.call(*args.unshift(name))
-        end
-      end
-
-      def drained?
-        true
-      end
-    end
-
     # This is a default queue implementation that ships with Notifications. It
     # consumes events in a thread and publish them to all registered subscribers.
     #
@@ -198,16 +150,40 @@ module ActiveSupport
         @listeners.each { |l| l.publish(*args) }
       end
 
-      def subscribe(listener, pattern=nil, &block)
-        @listeners << listener.new(pattern, &block)
+      def subscribe(pattern=nil, &block)
+        @listeners << Listener.new(pattern, &block)
       end
 
       def drained?
         @listeners.all? &:drained?
       end
+
+      class Listener
+        def initialize(pattern, &block)
+          @pattern = pattern
+          @subscriber = block
+          @queue = Queue.new
+          Thread.new { consume }
+        end
+
+        def publish(name, *args)
+          if !@pattern || @pattern === name.to_s
+            @queue << args.unshift(name)
+          end
+        end
+
+        def consume
+          while args = @queue.shift
+            @subscriber.call(*args)
+          end
+        end
+
+        def drained?
+          @queue.size.zero?
+        end
+      end
     end
   end
 
-  Notifications.queue    = Notifications::LittleFanout.new
-  Notifications.listener = Notifications::AsyncListener
+  Notifications.queue = Notifications::LittleFanout.new
 end
diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb
index 35d44367cf..01106e83e9 100644
--- a/activesupport/test/notifications_test.rb
+++ b/activesupport/test/notifications_test.rb
@@ -176,21 +176,6 @@ class NotificationsMainTest < Test::Unit::TestCase
     assert_equal 1, @another.first.result
   end
 
-  def test_subscriber_allows_sync_listeners
-    @another = []
-    ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args|
-      @another << ActiveSupport::Notifications::Event.new(*args)
-    end
-
-    Thread.expects(:new).never
-    ActiveSupport::Notifications.instrument(:something){ 0 }
-    ActiveSupport::Notifications.instrument(:cache){ 1 }
-
-    assert_equal 1, @another.size
-    assert_equal :cache, @another.first.name
-    assert_equal 1, @another.first.result
-  end
-
   def test_with_several_consumers_and_several_events
     @another = []
     ActiveSupport::Notifications.subscribe do |*args|
@@ -216,6 +201,6 @@ class NotificationsMainTest < Test::Unit::TestCase
 
   private
     def drain
-      sleep(0.05) until ActiveSupport::Notifications.queue.drained?
+      sleep(0.1) until ActiveSupport::Notifications.queue.drained?
     end
 end
-- 
cgit v1.2.3