diff options
Diffstat (limited to 'activesupport/lib/active_support/notifications.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications.rb | 100 |
1 files changed, 61 insertions, 39 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb index 6304f496f5..e2540cd598 100644 --- a/activesupport/lib/active_support/notifications.rb +++ b/activesupport/lib/active_support/notifications.rb @@ -41,7 +41,7 @@ module ActiveSupport # to subscribers in a thread. You can use any queue implementation you want. # module Notifications - mattr_accessor :queue + mattr_accessor :queue, :listener class << self delegate :instrument, :transaction_id, :transaction, :to => :instrumenter @@ -54,8 +54,13 @@ module ActiveSupport @publisher ||= Publisher.new(queue) end - def subscribe(pattern=nil, &block) - Subscriber.new(queue).bind(pattern).subscribe(&block) + def subscriber + @subscriber ||= Subscriber.new(queue) + end + + def subscribe(pattern=nil, options={}, &block) + with = options[:with] || listener + subscriber.bind(with, pattern).subscribe(&block) end end @@ -104,13 +109,14 @@ module ActiveSupport @queue = queue end - def bind(pattern) - @pattern = pattern + def bind(listener, pattern) + @listener = listener + @pattern = pattern self end def subscribe - @queue.subscribe(@pattern) do |*args| + @queue.subscribe(@listener, @pattern) do |*args| yield(*args) end end @@ -138,54 +144,70 @@ module ActiveSupport end end - # This is a default queue implementation that ships with Notifications. It - # consumes events in a thread and publish them to all registered subscribers. - # - class LittleFanout - def initialize - @listeners = [] - @stream = Queue.new + class AsyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + @queue = Queue.new Thread.new { consume } end - def publish(*args) - @stream.push(args) - end - - def subscribe(pattern=nil, &block) - @listeners << Listener.new(pattern, &block) + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @queue << args.unshift(name) + end end def consume - while args = @stream.shift - @listeners.each { |l| l.publish(*args) } + while args = @queue.shift + @subscriber.call(*args) end end - class Listener - # attr_reader :thread + def drained? + @queue.size.zero? + end + end - def initialize(pattern, &block) - @pattern = pattern - @subscriber = block - @queue = Queue.new - Thread.new { consume } - end + class SyncListener + def initialize(pattern, &block) + @pattern = pattern + @subscriber = block + end - def publish(name, *args) - if !@pattern || @pattern === name.to_s - @queue << args.unshift(name) - end + def publish(name, *args) + if !@pattern || @pattern === name.to_s + @subscriber.call(*args.unshift(name)) end + end - def consume - while args = @queue.shift - @subscriber.call(*args) - end - end + def drained? + true + end + end + + # This is a default queue implementation that ships with Notifications. It + # consumes events in a thread and publish them to all registered subscribers. + # + class LittleFanout + def initialize + @listeners = [] + end + + def publish(*args) + @listeners.each { |l| l.publish(*args) } + end + + def subscribe(listener, pattern=nil, &block) + @listeners << listener.new(pattern, &block) + end + + def drained? + @listeners.all? &:drained? end end end - Notifications.queue = Notifications::LittleFanout.new + Notifications.queue = Notifications::LittleFanout.new + Notifications.listener = Notifications::AsyncListener end |