From 327545c3ae904d1a9c67de3e280c182ed6418023 Mon Sep 17 00:00:00 2001 From: Jeremy Kemper Date: Sun, 29 Nov 2009 02:30:35 -0800 Subject: Notifications: synchronous fanout queue pushes events to subscribers rather than having them concurrently pull --- .../lib/active_support/notifications/fanout.rb | 61 ++++++++++++++-------- activesupport/test/notifications_test.rb | 10 ++++ 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb index 412d977b25..bb07e4765c 100644 --- a/activesupport/lib/active_support/notifications/fanout.rb +++ b/activesupport/lib/active_support/notifications/fanout.rb @@ -6,7 +6,8 @@ module ActiveSupport # consumes events in a thread and publish them to all registered subscribers. # class Fanout - def initialize + def initialize(sync = false) + @subscriber_klass = sync ? Subscriber : AsyncSubscriber @subscribers = [] end @@ -15,7 +16,7 @@ module ActiveSupport end def subscribe(pattern = nil, &block) - @subscribers << Subscriber.new(pattern, &block) + @subscribers << @subscriber_klass.new(pattern, &block) end def publish(*args) @@ -29,7 +30,14 @@ module ActiveSupport # Used for internal implementation only. class Binding #:nodoc: def initialize(queue, pattern) - @queue, @pattern = queue, pattern + @queue = queue + @pattern = + case pattern + when Regexp, NilClass + pattern + else + /^#{Regexp.escape(pattern.to_s)}/ + end end def subscribe(&block) @@ -37,33 +45,40 @@ module ActiveSupport end end - # Used for internal implementation only. class Subscriber #:nodoc: def initialize(pattern, &block) - @pattern = - case pattern - when Regexp, NilClass - pattern - else - /^#{Regexp.escape(pattern.to_s)}/ - end + @pattern = pattern @block = block - @events = Queue.new - start_consumer end - def publish(name, *args) - push(name, args) if matches?(name) + def publish(*args) + push(*args) if matches?(args.first) end - def consume - while args = @events.shift + def drained? + true + end + + private + def matches?(name) + !@pattern || @pattern =~ name.to_s + end + + def push(*args) @block.call(*args) end + end + + # Used for internal implementation only. + class AsyncSubscriber < Subscriber #:nodoc: + def initialize(pattern, &block) + super + @events = Queue.new + start_consumer end def drained? - @events.size.zero? + @events.empty? end private @@ -71,12 +86,14 @@ module ActiveSupport Thread.new { consume } end - def matches?(name) - !@pattern || @pattern =~ name.to_s + def consume + while args = @events.shift + @block.call(*args) + end end - def push(name, args) - @events << args.unshift(name) + def push(*args) + @events << args end end end diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb index 93c61b2c83..4f880d0db7 100644 --- a/activesupport/test/notifications_test.rb +++ b/activesupport/test/notifications_test.rb @@ -71,6 +71,16 @@ module Notifications end end + class SyncPubSubTest < PubSubTest + def setup + Thread.abort_on_exception = true + + @notifier = ActiveSupport::Notifications::Notifier.new(ActiveSupport::Notifications::Fanout.new(true)) + @events = [] + @notifier.subscribe { |*args| @events << event(*args) } + end + end + class InstrumentationTest < TestCase def test_instrument_returns_block_result assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 } -- cgit v1.2.3