aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support/notifications.rb
diff options
context:
space:
mode:
authorMikel Lindsaar <raasdnil@gmail.com>2009-11-24 21:45:14 +1100
committerMikel Lindsaar <raasdnil@gmail.com>2009-11-24 21:45:14 +1100
commit5f2395041d1578433fa825ed5c6f26a201f2203d (patch)
tree3b78531ae77a2173ad0df1103543bdfea0b1f60f /activesupport/lib/active_support/notifications.rb
parent3a72923e27195983d37bdb39ef26b29cf03d3483 (diff)
parente62e6d409986cd5c99234689aa49e3162d7b3a59 (diff)
downloadrails-5f2395041d1578433fa825ed5c6f26a201f2203d.tar.gz
rails-5f2395041d1578433fa825ed5c6f26a201f2203d.tar.bz2
rails-5f2395041d1578433fa825ed5c6f26a201f2203d.zip
Merge branch 'master' of git://github.com/rails/rails into rails_master
Diffstat (limited to 'activesupport/lib/active_support/notifications.rb')
-rw-r--r--activesupport/lib/active_support/notifications.rb102
1 files changed, 62 insertions, 40 deletions
diff --git a/activesupport/lib/active_support/notifications.rb b/activesupport/lib/active_support/notifications.rb
index 9eae3bebe2..e2540cd598 100644
--- a/activesupport/lib/active_support/notifications.rb
+++ b/activesupport/lib/active_support/notifications.rb
@@ -41,7 +41,7 @@ module ActiveSupport
# to subscribers in a thread. You can use any queue implementation you want.
#
module Notifications
- mattr_accessor :queue
+ mattr_accessor :queue, :listener
class << self
delegate :instrument, :transaction_id, :transaction, :to => :instrumenter
@@ -54,8 +54,13 @@ module ActiveSupport
@publisher ||= Publisher.new(queue)
end
- def subscribe(pattern=nil, &block)
- Subscriber.new(queue).bind(pattern).subscribe(&block)
+ def subscriber
+ @subscriber ||= Subscriber.new(queue)
+ end
+
+ def subscribe(pattern=nil, options={}, &block)
+ with = options[:with] || listener
+ subscriber.bind(with, pattern).subscribe(&block)
end
end
@@ -104,14 +109,15 @@ module ActiveSupport
@queue = queue
end
- def bind(pattern)
- @pattern = pattern
+ def bind(listener, pattern)
+ @listener = listener
+ @pattern = pattern
self
end
def subscribe
- @queue.subscribe(@pattern) do |*args|
- yield *args
+ @queue.subscribe(@listener, @pattern) do |*args|
+ yield(*args)
end
end
end
@@ -138,54 +144,70 @@ module ActiveSupport
end
end
- # This is a default queue implementation that ships with Notifications. It
- # consumes events in a thread and publish them to all registered subscribers.
- #
- class LittleFanout
- def initialize
- @listeners = []
- @stream = Queue.new
+ class AsyncListener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ @queue = Queue.new
Thread.new { consume }
end
- def publish(*args)
- @stream.push(args)
- end
-
- def subscribe(pattern=nil, &block)
- @listeners << Listener.new(pattern, &block)
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @queue << args.unshift(name)
+ end
end
def consume
- while args = @stream.shift
- @listeners.each { |l| l.publish(*args) }
+ while args = @queue.shift
+ @subscriber.call(*args)
end
end
- class Listener
- # attr_reader :thread
+ def drained?
+ @queue.size.zero?
+ end
+ end
- def initialize(pattern, &block)
- @pattern = pattern
- @subscriber = block
- @queue = Queue.new
- Thread.new { consume }
- end
+ class SyncListener
+ def initialize(pattern, &block)
+ @pattern = pattern
+ @subscriber = block
+ end
- def publish(name, *args)
- if !@pattern || @pattern === name.to_s
- @queue << args.unshift(name)
- end
+ def publish(name, *args)
+ if !@pattern || @pattern === name.to_s
+ @subscriber.call(*args.unshift(name))
end
+ end
- def consume
- while args = @queue.shift
- @subscriber.call(*args)
- end
- end
+ def drained?
+ true
+ end
+ end
+
+ # This is a default queue implementation that ships with Notifications. It
+ # consumes events in a thread and publish them to all registered subscribers.
+ #
+ class LittleFanout
+ def initialize
+ @listeners = []
+ end
+
+ def publish(*args)
+ @listeners.each { |l| l.publish(*args) }
+ end
+
+ def subscribe(listener, pattern=nil, &block)
+ @listeners << listener.new(pattern, &block)
+ end
+
+ def drained?
+ @listeners.all? &:drained?
end
end
end
- Notifications.queue = Notifications::LittleFanout.new
+ Notifications.queue = Notifications::LittleFanout.new
+ Notifications.listener = Notifications::AsyncListener
end