diff options
author | Jeremy Kemper <jeremy@bitsweat.net> | 2009-11-28 12:49:07 -0800 |
---|---|---|
committer | Jeremy Kemper <jeremy@bitsweat.net> | 2009-11-28 12:50:09 -0800 |
commit | 4f2a04cc085b9117e8af8079a95a063f671d7a3d (patch) | |
tree | fd96c9bc52dc13d3338ed08f4afcd1c685dc6348 /activesupport/lib/active_support/notifications/fanout.rb | |
parent | 02893d17053123cbf02b65c2fe549421c11a2604 (diff) | |
download | rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.gz rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.tar.bz2 rails-4f2a04cc085b9117e8af8079a95a063f671d7a3d.zip |
Notifications: extract central Notifier, cordon off the internal Fanout implementation, and segregate instrumentation concerns
Diffstat (limited to 'activesupport/lib/active_support/notifications/fanout.rb')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 84 |
1 files changed, 84 insertions, 0 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb new file mode 100644 index 0000000000..412d977b25 --- /dev/null +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -0,0 +1,84 @@ +require 'thread' + +module ActiveSupport + module Notifications + # This is a default queue implementation that ships with Notifications. It + # consumes events in a thread and publish them to all registered subscribers. + # + class Fanout + def initialize + @subscribers = [] + end + + def bind(pattern) + Binding.new(self, pattern) + end + + def subscribe(pattern = nil, &block) + @subscribers << Subscriber.new(pattern, &block) + end + + def publish(*args) + @subscribers.each { |s| s.publish(*args) } + end + + def wait + sleep(0.05) until @subscribers.all?(&:drained?) + end + + # Used for internal implementation only. + class Binding #:nodoc: + def initialize(queue, pattern) + @queue, @pattern = queue, pattern + end + + def subscribe(&block) + @queue.subscribe(@pattern, &block) + end + end + + # Used for internal implementation only. + class Subscriber #:nodoc: + def initialize(pattern, &block) + @pattern = + case pattern + when Regexp, NilClass + pattern + else + /^#{Regexp.escape(pattern.to_s)}/ + end + @block = block + @events = Queue.new + start_consumer + end + + def publish(name, *args) + push(name, args) if matches?(name) + end + + def consume + while args = @events.shift + @block.call(*args) + end + end + + def drained? + @events.size.zero? + end + + private + def start_consumer + Thread.new { consume } + end + + def matches?(name) + !@pattern || @pattern =~ name.to_s + end + + def push(name, args) + @events << args.unshift(name) + end + end + end + end +end |