aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--activesupport/lib/active_support/notifications/fanout.rb61
-rw-r--r--activesupport/test/notifications_test.rb10
2 files changed, 49 insertions, 22 deletions
diff --git a/activesupport/lib/active_support/notifications/fanout.rb b/activesupport/lib/active_support/notifications/fanout.rb
index 412d977b25..bb07e4765c 100644
--- a/activesupport/lib/active_support/notifications/fanout.rb
+++ b/activesupport/lib/active_support/notifications/fanout.rb
@@ -6,7 +6,8 @@ module ActiveSupport
# consumes events in a thread and publish them to all registered subscribers.
#
class Fanout
- def initialize
+ def initialize(sync = false)
+ @subscriber_klass = sync ? Subscriber : AsyncSubscriber
@subscribers = []
end
@@ -15,7 +16,7 @@ module ActiveSupport
end
def subscribe(pattern = nil, &block)
- @subscribers << Subscriber.new(pattern, &block)
+ @subscribers << @subscriber_klass.new(pattern, &block)
end
def publish(*args)
@@ -29,7 +30,14 @@ module ActiveSupport
# Used for internal implementation only.
class Binding #:nodoc:
def initialize(queue, pattern)
- @queue, @pattern = queue, pattern
+ @queue = queue
+ @pattern =
+ case pattern
+ when Regexp, NilClass
+ pattern
+ else
+ /^#{Regexp.escape(pattern.to_s)}/
+ end
end
def subscribe(&block)
@@ -37,33 +45,40 @@ module ActiveSupport
end
end
- # Used for internal implementation only.
class Subscriber #:nodoc:
def initialize(pattern, &block)
- @pattern =
- case pattern
- when Regexp, NilClass
- pattern
- else
- /^#{Regexp.escape(pattern.to_s)}/
- end
+ @pattern = pattern
@block = block
- @events = Queue.new
- start_consumer
end
- def publish(name, *args)
- push(name, args) if matches?(name)
+ def publish(*args)
+ push(*args) if matches?(args.first)
end
- def consume
- while args = @events.shift
+ def drained?
+ true
+ end
+
+ private
+ def matches?(name)
+ !@pattern || @pattern =~ name.to_s
+ end
+
+ def push(*args)
@block.call(*args)
end
+ end
+
+ # Used for internal implementation only.
+ class AsyncSubscriber < Subscriber #:nodoc:
+ def initialize(pattern, &block)
+ super
+ @events = Queue.new
+ start_consumer
end
def drained?
- @events.size.zero?
+ @events.empty?
end
private
@@ -71,12 +86,14 @@ module ActiveSupport
Thread.new { consume }
end
- def matches?(name)
- !@pattern || @pattern =~ name.to_s
+ def consume
+ while args = @events.shift
+ @block.call(*args)
+ end
end
- def push(name, args)
- @events << args.unshift(name)
+ def push(*args)
+ @events << args
end
end
end
diff --git a/activesupport/test/notifications_test.rb b/activesupport/test/notifications_test.rb
index 93c61b2c83..4f880d0db7 100644
--- a/activesupport/test/notifications_test.rb
+++ b/activesupport/test/notifications_test.rb
@@ -71,6 +71,16 @@ module Notifications
end
end
+ class SyncPubSubTest < PubSubTest
+ def setup
+ Thread.abort_on_exception = true
+
+ @notifier = ActiveSupport::Notifications::Notifier.new(ActiveSupport::Notifications::Fanout.new(true))
+ @events = []
+ @notifier.subscribe { |*args| @events << event(*args) }
+ end
+ end
+
class InstrumentationTest < TestCase
def test_instrument_returns_block_result
assert_equal 2, @notifier.instrument(:awesome) { 1 + 1 }