aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJeremy Kemper <jeremy@bitsweat.net>2009-11-24 19:26:13 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2009-11-28 12:50:09 -0800
commit6f7fc5824f2033c0f674b002dbee7f1c3f3384ac (patch)
tree6d838011ebc6e8465a31bd1dbebecb7831cdbe26
parenteeb1afa20003050039eab7420003a775343754c1 (diff)
downloadrails-6f7fc5824f2033c0f674b002dbee7f1c3f3384ac.tar.gz
rails-6f7fc5824f2033c0f674b002dbee7f1c3f3384ac.tar.bz2
rails-6f7fc5824f2033c0f674b002dbee7f1c3f3384ac.zip
Revert "Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine."
Take a step back on this API direction. This reverts commit 8104f65c3225453d13307c3c2733c2a8f99e491a.
-rw-r--r--activesupport/lib/active_support/notifications.rb92
-rw-r--r--activesupport/test/notifications_test.rb17
2 files changed, 35 insertions, 74 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index e2540cd598..7a9f76b26a 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -41,7 +41,7 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue, :listener
+ mattr_accessor :queue
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,13 +54,8 @@ module ActiveSupport
@publisher ||= Publisher.new(queue)
end
- def subscriber
- @subscriber ||= Subscriber.new(queue)
- end
-
- def subscribe(pattern=nil, options={}, &block)
- with = options[:with] || listener
- subscriber.bind(with, pattern).subscribe(&block)
+ def subscribe(pattern=nil, &block)
+ Subscriber.new(queue).bind(pattern).subscribe(&block)
end
end
@@ -109,14 +104,13 @@ module ActiveSupport
@queue = queue
end
- def bind(listener, pattern)
- @listener = listener
- @pattern = pattern
+ def bind(pattern)
+ @pattern = pattern
self
end
def subscribe
- @queue.subscribe(@listener, @pattern) do |*args|
+ @queue.subscribe(@pattern) do |*args|
yield(*args)
end
end
@@ -144,48 +138,6 @@ module ActiveSupport
end
end
- class AsyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
-
- def drained?
- @queue.size.zero?
- end
- end
-
- class SyncListener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @subscriber.call(*args.unshift(name))
- end
- end
-
- def drained?
- true
- end
- end
-
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
@@ -198,16 +150,40 @@ module ActiveSupport
@listeners.each { |l| l.publish(*args) }
end
- def subscribe(listener, pattern=nil, &block)
- @listeners << listener.new(pattern, &block)
+ def subscribe(pattern=nil, &block)
+ @listeners << Listener.new(pattern, &block)
end
def drained?
@listeners.all? &:drained?
end
+
+ class Listener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ @queue = Queue.new
+ Thread.new { consume }
+ end
+
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @queue << args.unshift(name)
+ end
+ end
+
+ def consume
+ while args = @queue.shift
+ @subscriber.call(*args)
+ end
+ end
+
+ def drained?
+ @queue.size.zero?
+ end
+ end
end
end
- Notifications.queue = Notifications::LittleFanout.new
- Notifications.listener = Notifications::AsyncListener
+ Notifications.queue = Notifications::LittleFanout.new
end
diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb
index 35d44367cf..01106e83e9 100644
--- a/activesupport/test/notifications_test.rb
+++ b/activesupport/test/notifications_test.rb
@@ -176,21 +176,6 @@ class NotificationsMainTest < Test::Unit::TestCase
assert_equal 1, @another.first.result
end
- def test_subscriber_allows_sync_listeners
- @another = []
- ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args|
- @another << ActiveSupport::Notifications::Event.new(*args)
- end
-
- Thread.expects(:new).never
- ActiveSupport::Notifications.instrument(:something){ 0 }
- ActiveSupport::Notifications.instrument(:cache){ 1 }
-
- assert_equal 1, @another.size
- assert_equal :cache, @another.first.name
- assert_equal 1, @another.first.result
- end
-
def test_with_several_consumers_and_several_events
@another = []
ActiveSupport::Notifications.subscribe do |*args|
@@ -216,6 +201,6 @@ class NotificationsMainTest < Test::Unit::TestCase
private
def drain
- sleep(0.05) until ActiveSupport::Notifications.queue.drained?
+ sleep(0.1) until ActiveSupport::Notifications.queue.drained?
end
end