diff options
Diffstat (limited to 'activesupport/lib')
-rw-r--r-- | activesupport/lib/active_support/notifications/fanout.rb | 61 |
1 files changed, 39 insertions, 22 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 412d977b25..bb07e4765c 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -6,7 +6,8 @@ module ActiveSupport # consumes events in a thread and publish them to all registered subscribers. # class Fanout - def initialize + def initialize(sync = false) + @subscriber_klass = sync ? Subscriber : AsyncSubscriber @subscribers = [] end @@ -15,7 +16,7 @@ module ActiveSupport end def subscribe(pattern = nil, &block) - @subscribers << Subscriber.new(pattern, &block) + @subscribers << @subscriber_klass.new(pattern, &block) end def publish(*args) @@ -29,7 +30,14 @@ module ActiveSupport # Used for internal implementation only. class Binding #:nodoc: def initialize(queue, pattern) - @queue, @pattern = queue, pattern + @queue = queue + @pattern = + case pattern + when Regexp, NilClass + pattern + else + /^#{Regexp.escape(pattern.to_s)}/ + end end def subscribe(&block) @@ -37,33 +45,40 @@ module ActiveSupport end end - # Used for internal implementation only. class Subscriber #:nodoc: def initialize(pattern, &block) - @pattern = - case pattern - when Regexp, NilClass - pattern - else - /^#{Regexp.escape(pattern.to_s)}/ - end + @pattern = pattern @block = block - @events = Queue.new - start_consumer end - def publish(name, *args) - push(name, args) if matches?(name) + def publish(*args) + push(*args) if matches?(args.first) end - def consume - while args = @events.shift + def drained? + true + end + + private + def matches?(name) + !@pattern || @pattern =~ name.to_s + end + + def push(*args) @block.call(*args) end + end + + # Used for internal implementation only. + class AsyncSubscriber < Subscriber #:nodoc: + def initialize(pattern, &block) + super + @events = Queue.new + start_consumer end def drained? - @events.size.zero? + @events.empty? end private @@ -71,12 +86,14 @@ module ActiveSupport Thread.new { consume } end - def matches?(name) - !@pattern || @pattern =~ name.to_s + def consume + while args = @events.shift + @block.call(*args) + end end - def push(name, args) - @events << args.unshift(name) + def push(*args) + @events << args end end end |