From 6eb163a706635c0ab5ede9056a0720f1a5f18f22 Mon Sep 17 00:00:00 2001 From: Cristian Bica Date: Thu, 22 May 2014 23:38:01 +0300 Subject: Tagged logging --- lib/active_job/base.rb | 4 +- lib/active_job/identifier.rb | 16 ++++++++ lib/active_job/logging.rb | 50 ++++++++++++++++++----- lib/active_job/queue_adapters/sneakers_adapter.rb | 6 +-- 4 files changed, 61 insertions(+), 15 deletions(-) create mode 100644 lib/active_job/identifier.rb (limited to 'lib/active_job') diff --git a/lib/active_job/base.rb b/lib/active_job/base.rb index 27733577e4..0c772e3126 100644 --- a/lib/active_job/base.rb +++ b/lib/active_job/base.rb @@ -2,8 +2,9 @@ require 'active_job/queue_adapter' require 'active_job/queue_name' require 'active_job/enqueuing' require 'active_job/execution' -require 'active_job/logging' require 'active_job/callbacks' +require 'active_job/identifier' +require 'active_job/logging' module ActiveJob class Base @@ -13,6 +14,7 @@ module ActiveJob include Enqueuing include Execution include Callbacks + include Identifier include Logging ActiveSupport.run_load_hooks(:active_job, self) diff --git a/lib/active_job/identifier.rb b/lib/active_job/identifier.rb new file mode 100644 index 0000000000..af32b03404 --- /dev/null +++ b/lib/active_job/identifier.rb @@ -0,0 +1,16 @@ +require 'active_job/arguments' + +module ActiveJob + module Identifier + extend ActiveSupport::Concern + + included do + attr_writer :job_id + end + + def job_id + @job_id ||= SecureRandom.uuid + end + + end +end diff --git a/lib/active_job/logging.rb b/lib/active_job/logging.rb index ede1606027..ea0ba2dddc 100644 --- a/lib/active_job/logging.rb +++ b/lib/active_job/logging.rb @@ -3,26 +3,51 @@ require 'active_support/core_ext/string/filters' module ActiveJob module Logging extend ActiveSupport::Concern - + included do cattr_accessor(:logger) { ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT)) } + around_enqueue do |job, block, _| + tag_logger do + block.call + end + end + + around_perform do |job, block, _| + tag_logger(job.class.name, job.job_id) do + payload = {adapter: job.class.queue_adapter, job: job.class, args: job.arguments} + ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup) + ActiveSupport::Notifications.instrument("perform.active_job", payload) do |payload| + block.call + end + end + end + before_enqueue do |job| if job.enqueued_at - ActiveSupport::Notifications.instrument "enqueue_at.active_job", + ActiveSupport::Notifications.instrument "enqueue_at.active_job", adapter: job.class.queue_adapter, job: job.class, args: job.arguments, timestamp: job.enqueued_at else ActiveSupport::Notifications.instrument "enqueue.active_job", adapter: job.class.queue_adapter, job: job.class, args: job.arguments end end - - before_perform do |job| - ActiveSupport::Notifications.instrument "perform.active_job", - adapter: job.class.queue_adapter, job: job.class, args: job.arguments - end end - + + private + def tag_logger(*tags) + if logger.respond_to?(:tagged) + tags.unshift "ActiveJob" unless logger_tagged_by_active_job? + ActiveJob::Base.logger.tagged(*tags){ yield } + else + yield + end + end + + def logger_tagged_by_active_job? + logger.formatter.current_tags.include?("ActiveJob") + end + class LogSubscriber < ActiveSupport::LogSubscriber def enqueue(event) info "Enqueued #{event.payload[:job].name} to #{queue_name(event)}" + args_info(event) @@ -32,10 +57,13 @@ module ActiveJob info "Enqueued #{event.payload[:job].name} to #{queue_name(event)} at #{enqueued_at(event)}" + args_info(event) end - def perform(event) - info "Performed #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) + def perform_start(event) + info "Performing #{event.payload[:job].name} from #{queue_name(event)}" + args_info(event) end + def perform(event) + info "Performed #{event.payload[:job].name} from #{queue_name(event)} in #{event.duration.round(2).to_s}ms" + end private def queue_name(event) @@ -43,7 +71,7 @@ module ActiveJob end def args_info(event) - event.payload[:args].any? ? ": #{event.payload[:args].inspect}" : "" + event.payload[:args].any? ? " with arguments: #{event.payload[:args].map(&:inspect).join(", ")}" : "" end def enqueued_at(event) diff --git a/lib/active_job/queue_adapters/sneakers_adapter.rb b/lib/active_job/queue_adapters/sneakers_adapter.rb index 3ca7745506..f7da691935 100644 --- a/lib/active_job/queue_adapters/sneakers_adapter.rb +++ b/lib/active_job/queue_adapters/sneakers_adapter.rb @@ -4,11 +4,11 @@ require 'thread' module ActiveJob module QueueAdapters class SneakersAdapter - @mutex = Mutex.new - + @monitor = Monitor.new + class << self def enqueue(job, *args) - @mutex.synchronize do + @monitor.synchronize do JobWrapper.from_queue job.queue_name JobWrapper.enqueue [ job, *args ] end -- cgit v1.2.3