diff options
author | twinturbo <me@broadcastingadam.com> | 2012-04-28 22:33:11 -0700 |
---|---|---|
committer | twinturbo <me@broadcastingadam.com> | 2012-04-28 22:33:11 -0700 |
commit | d7532c189f26e008fc6ef50336ed5e8168b8221c (patch) | |
tree | 681436aa5d80ed5e4c9a867ccc891eab6852f124 /railties/lib/rails/queueing.rb | |
parent | 2c136ae0543ea53dcb3a3ef9372b216b54d66172 (diff) | |
parent | 6659252d9f0e6d77bee268adf587e03cfeb8f9ad (diff) | |
download | rails-d7532c189f26e008fc6ef50336ed5e8168b8221c.tar.gz rails-d7532c189f26e008fc6ef50336ed5e8168b8221c.tar.bz2 rails-d7532c189f26e008fc6ef50336ed5e8168b8221c.zip |
Merge branch 'master' of github.com:lifo/docrails
Diffstat (limited to 'railties/lib/rails/queueing.rb')
-rw-r--r-- | railties/lib/rails/queueing.rb | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/railties/lib/rails/queueing.rb b/railties/lib/rails/queueing.rb new file mode 100644 index 0000000000..b77940f821 --- /dev/null +++ b/railties/lib/rails/queueing.rb @@ -0,0 +1,65 @@ +module Rails + module Queueing + # In test mode, the Rails queue is backed by an Array so that assertions + # can be made about its contents. The test queue provides a +contents+ + # method to make assertions about the queue's contents and a +drain+ + # method to drain the queue and run the jobs. + # + # Jobs are run in a separate thread to catch mistakes where code + # assumes that the job is run in the same thread. + class TestQueue + attr_reader :contents + + def initialize + @contents = [] + end + + def drain + # run the jobs in a separate thread so assumptions of synchronous + # jobs are caught in test mode. + t = Thread.new do + while job = @contents.pop + job.run + end + end + t.join + end + + # implement the Queue API + def push(object) + @contents << object + end + end + + # The threaded consumer will run jobs in a background thread in + # development mode or in a VM where running jobs on a thread in + # production mode makes sense. + # + # When the process exits, the consumer pushes a nil onto the + # queue and joins the thread, which will ensure that all jobs + # are executed before the process finally dies. + class ThreadedConsumer + def self.start(queue) + new(queue).start + end + + def initialize(queue) + @queue = queue + end + + def start + @thread = Thread.new do + while job = @queue.pop + job.run + end + end + self + end + + def shutdown + @queue.push nil + @thread.join + end + end + end +end |