aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJosé Valim <jose.valim@gmail.com>2009-11-19 14:50:27 -0200
committerYehuda Katz <wycats@Yehuda-Katz.local>2009-11-23 09:08:17 -0800
commit8104f65c3225453d13307c3c2733c2a8f99e491a (patch)
tree433b697be24ca3f8cfeab45add6a5e1939a668fe
parent0f9029ec4854c8dfa14e75ea646708ba5135fbf2 (diff)
downloadrails-8104f65c3225453d13307c3c2733c2a8f99e491a.tar.gz
rails-8104f65c3225453d13307c3c2733c2a8f99e491a.tar.bz2
rails-8104f65c3225453d13307c3c2733c2a8f99e491a.zip
Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine.
Signed-off-by: Yehuda Katz <wycats@Yehuda-Katz.local>
-rw-r--r--activesupport/lib/active_support/notifications.rb92
-rw-r--r--activesupport/test/notifications_test.rb17
2 files changed, 74 insertions, 35 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index 7a9f76b26a..e2540cd598 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -41,7 +41,7 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue
+ mattr_accessor :queue, :listener
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,8 +54,13 @@ module ActiveSupport
@publisher ||= Publisher.new(queue)
end
- def subscribe(pattern=nil, &block)
- Subscriber.new(queue).bind(pattern).subscribe(&block)
+ def subscriber
+ @subscriber ||= Subscriber.new(queue)
+ end
+
+ def subscribe(pattern=nil, options={}, &block)
+ with = options[:with] || listener
+ subscriber.bind(with, pattern).subscribe(&block)
end
end
@@ -104,13 +109,14 @@ module ActiveSupport
@queue = queue
end
- def bind(pattern)
- @pattern = pattern
+ def bind(listener, pattern)
+ @listener = listener
+ @pattern = pattern
self
end
def subscribe
- @queue.subscribe(@pattern) do |*args|
+ @queue.subscribe(@listener, @pattern) do |*args|
yield(*args)
end
end
@@ -138,6 +144,48 @@ module ActiveSupport
end
end
+ class AsyncListener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ @queue = Queue.new
+ Thread.new { consume }
+ end
+
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @queue << args.unshift(name)
+ end
+ end
+
+ def consume
+ while args = @queue.shift
+ @subscriber.call(*args)
+ end
+ end
+
+ def drained?
+ @queue.size.zero?
+ end
+ end
+
+ class SyncListener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ end
+
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @subscriber.call(*args.unshift(name))
+ end
+ end
+
+ def drained?
+ true
+ end
+ end
+
# This is a default queue implementation that ships with Notifications. It
# consumes events in a thread and publish them to all registered subscribers.
#
@@ -150,40 +198,16 @@ module ActiveSupport
@listeners.each { |l| l.publish(*args) }
end
- def subscribe(pattern=nil, &block)
- @listeners << Listener.new(pattern, &block)
+ def subscribe(listener, pattern=nil, &block)
+ @listeners << listener.new(pattern, &block)
end
def drained?
@listeners.all? &:drained?
end
-
- class Listener
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
-
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
- end
-
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
-
- def drained?
- @queue.size.zero?
- end
- end
end
end
- Notifications.queue = Notifications::LittleFanout.new
+ Notifications.queue = Notifications::LittleFanout.new
+ Notifications.listener = Notifications::AsyncListener
end
diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb
index 01106e83e9..35d44367cf 100644
--- a/activesupport/test/notifications_test.rb
+++ b/activesupport/test/notifications_test.rb
@@ -176,6 +176,21 @@ class NotificationsMainTest < Test::Unit::TestCase
assert_equal 1, @another.first.result
end
+ def test_subscriber_allows_sync_listeners
+ @another = []
+ ActiveSupport::Notifications.subscribe(/cache/, :with => ActiveSupport::Notifications::SyncListener) do |*args|
+ @another << ActiveSupport::Notifications::Event.new(*args)
+ end
+
+ Thread.expects(:new).never
+ ActiveSupport::Notifications.instrument(:something){ 0 }
+ ActiveSupport::Notifications.instrument(:cache){ 1 }
+
+ assert_equal 1, @another.size
+ assert_equal :cache, @another.first.name
+ assert_equal 1, @another.first.result
+ end
+
def test_with_several_consumers_and_several_events
@another = []
ActiveSupport::Notifications.subscribe do |*args|
@@ -201,6 +216,6 @@ class NotificationsMainTest < Test::Unit::TestCase
private
def drain
- sleep(0.1) until ActiveSupport::Notifications.queue.drained?
+ sleep(0.05) until ActiveSupport::Notifications.queue.drained?
end
end