From 8104f65c3225453d13307c3c2733c2a8f99e491a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Thu, 19 Nov 2009 14:50:27 -0200 Subject: Create SyncListener. Since they do not rely on Thread, they can be used on Google App Engine. Signed-off-by: Yehuda Katz --- activesupport/lib/active_support/notifications.rb | 92 ++++++++++++++--------- 1 file changed, 58 insertions(+), 34 deletions(-) (limited to 'activesupport/lib/active_support/notifications.rb') diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 7a9f76b26a..e2540cd598 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -41,7 +41,7 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + mattr_accessor :queue, :listener class << self delegate :instrument, :transaction_id, :transaction, :to => :instrumenter @@ -54,8 +54,13 @@ module ActiveSupport @publisher ||= Publisher.new(queue) end - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def subscriber + @subscriber ||= Subscriber.new(queue) + end + + def subscribe(pattern=nil, options={}, &block) + with = options[:with] || listener + subscriber.bind(with, pattern).subscribe(&block) end end @@ -104,13 +109,14 @@ module ActiveSupport @queue = queue end - def bind(pattern) - @pattern = pattern + def bind(listener, pattern) + @listener = listener + @pattern = pattern self end def subscribe - @queue.subscribe(@pattern) do |*args| + @queue.subscribe(@listener, @pattern) do |*args| yield(*args) end end @@ -138,6 +144,48 @@ module ActiveSupport end end + class AsyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new + Thread.new { consume } + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @queue << args.unshift(name) + end + end + + def consume + while args = @queue.shift + @subscriber.call(*args) + end + end + + def drained? + @queue.size.zero? + end + end + + class SyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end + + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @subscriber.call(*args.unshift(name)) + end + end + + def drained? + true + end + end + # This is a default queue implementation that ships with Notifications. It # consumes events in a thread and publish them to all registered subscribers. # @@ -150,40 +198,16 @@ module ActiveSupport @listeners.each { |l| l.publish(*args) } end - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def subscribe(listener, pattern=nil, &block) + @listeners << listener.new(pattern, &block) end def drained? @listeners.all? &:drained? end - - class Listener - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end - - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end - end - - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end - - def drained? - @queue.size.zero? - end - end end end - Notifications.queue = Notifications::LittleFanout.new + Notifications.queue = Notifications::LittleFanout.new + Notifications.listener = Notifications::AsyncListener end -- cgit v1.2.3