1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
|
# frozen_string_literal: true
require "active_support/core_ext/string/filters"
require "active_support/tagged_logging"
require "active_support/logger"
module ActiveJob
module Logging #:nodoc:
extend ActiveSupport::Concern
included do
cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))
around_enqueue do |_, block|
tag_logger do
block.call
end
end
around_perform do |job, block|
tag_logger(job.class.name, job.job_id) do
payload = { adapter: job.class.queue_adapter, job: job }
ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
ActiveSupport::Notifications.instrument("perform.active_job", payload) do
block.call
end
end
end
around_enqueue do |job, block|
if job.scheduled_at
ActiveSupport::Notifications.instrument("enqueue_at.active_job",
adapter: job.class.queue_adapter, job: job, &block)
else
ActiveSupport::Notifications.instrument("enqueue.active_job",
adapter: job.class.queue_adapter, job: job, &block)
end
end
end
private
def tag_logger(*tags)
if logger.respond_to?(:tagged)
tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
logger.tagged(*tags) { yield }
else
yield
end
end
def logger_tagged_by_active_job?
logger.formatter.current_tags.include?("ActiveJob")
end
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
def enqueue(event)
info do
job = event.payload[:job]
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
end
end
def enqueue_at(event)
info do
job = event.payload[:job]
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
end
end
def perform_start(event)
info do
job = event.payload[:job]
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
end
end
def perform(event)
job = event.payload[:job]
ex = event.payload[:exception_object]
if ex
error do
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
end
else
info do
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
end
end
end
def enqueue_retry(event)
job = event.payload[:job]
ex = event.payload[:error]
wait = event.payload[:wait]
info do
if ex
"Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
else
"Retrying #{job.class} in #{wait.to_i} seconds."
end
end
end
def retry_stopped(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
end
end
def discard(event)
job = event.payload[:job]
ex = event.payload[:error]
error do
"Discarded #{job.class} due to a #{ex.class}."
end
end
private
def queue_name(event)
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
end
def args_info(job)
if job.arguments.any?
" with arguments: " +
job.arguments.map { |arg| format(arg).inspect }.join(", ")
else
""
end
end
def format(arg)
case arg
when Hash
arg.transform_values { |value| format(value) }
when Array
arg.map { |value| format(value) }
when GlobalID::Identification
arg.to_global_id rescue arg
else
arg
end
end
def scheduled_at(event)
Time.at(event.payload[:job].scheduled_at).utc
end
def logger
ActiveJob::Base.logger
end
end
end
end
ActiveJob::Logging::LogSubscriber.attach_to :active_job
|