1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# frozen_string_literal: true
require "active_support/core_ext/hash/transform_values"
require "active_support/core_ext/string/filters"
require "active_support/tagged_logging"
require "active_support/logger"
module ActiveJob
module Logging #:nodoc:
extend ActiveSupport::Concern
included do
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
around_enqueue do |_, block, _|
tag_logger do
block.call
end
end
around_perform do |job, block, _|
tag_logger(job.class.name, job.job_id) do
payload = { adapter: job.class.queue_adapter, job: job }
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
end
end
end
after_enqueue do |job|
if job.scheduled_at
ActiveSupport::Notifications.instrument "enqueue_at.active_job",
adapter: job.class.queue_adapter, job: job
else
ActiveSupport::Notifications.instrument "enqueue.active_job",
adapter: job.class.queue_adapter, job: job
end
end
end
private
def tag_logger(*tags)
if logger.respond_to?(:tagged)
tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
logger.tagged(*tags) { yield }
else
yield
end
end
def logger_tagged_by_active_job?
logger.formatter.current_tags.include?("ActiveJob")
end
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
def enqueue(event)
info do
job = event.payload[:job]
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
end
end
def enqueue_at(event)
info do
job = event.payload[:job]
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
end
end
def perform_start(event)
info do
job = event.payload[:job]
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)}" + args_info(job)
end
end
def perform(event)
job = event.payload[:job]
ex = event.payload[:exception_object]
if ex
error do
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
end
else
info do
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
end
end
end
private
def queue_name(event)
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
end
def args_info(job)
if job.arguments.any?
" with arguments: " +
job.arguments.map { |arg| format(arg).inspect }.join(", ")
else
""
end
end
def format(arg)
case arg
when Hash
arg.transform_values { |value| format(value) }
when Array
arg.map { |value| format(value) }
when GlobalID::Identification
arg.to_global_id rescue arg
else
arg
end
end
def scheduled_at(event)
Time.at(event.payload[:job].scheduled_at).utc
end
def logger
ActiveJob::Base.logger
end
end
end
end
ActiveJob::Logging::LogSubscriber.attach_to :active_job
|