aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job/core.rb
blob: c4e12fc518a0d429f5647cf634af53bc5d2d06ca (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# frozen_string_literal: true

module ActiveJob
  # Provides general behavior that will be included into every Active Job
  # object that inherits from ActiveJob::Base.
  module Core
    extend ActiveSupport::Concern

    included do
      # Job arguments
      attr_accessor :arguments
      attr_writer :serialized_arguments

      # Timestamp when the job should be performed
      attr_accessor :scheduled_at

      # Job Identifier
      attr_accessor :job_id

      # Queue in which the job will reside.
      attr_writer :queue_name

      # Priority that the job will have (lower is more priority).
      attr_writer :priority

      # ID optionally provided by adapter
      attr_accessor :provider_job_id

      # Number of times this job has been executed (which increments on every retry, like after an exception).
      attr_accessor :executions

      # I18n.locale to be used during the job.
      attr_accessor :locale
    end

    # These methods will be included into any Active Job object, adding
    # helpers for de/serialization and creation of job instances.
    module ClassMethods
      # Creates a new job instance from a hash created with +serialize+
      def deserialize(job_data)
        job = job_data["job_class"].constantize.new
        job.deserialize(job_data)
        job
      end

      # Creates a job preconfigured with the given options. You can call
      # perform_later with the job arguments to enqueue the job with the
      # preconfigured options
      #
      # ==== Options
      # * <tt>:wait</tt> - Enqueues the job with the specified delay
      # * <tt>:wait_until</tt> - Enqueues the job at the time specified
      # * <tt>:queue</tt> - Enqueues the job on the specified queue
      # * <tt>:priority</tt> - Enqueues the job with the specified priority
      #
      # ==== Examples
      #
      #    VideoJob.set(queue: :some_queue).perform_later(Video.last)
      #    VideoJob.set(wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
      def set(options = {})
        ConfiguredJob.new(self, options)
      end
    end

    # Creates a new job instance. Takes the arguments that will be
    # passed to the perform method.
    def initialize(*arguments)
      @arguments  = arguments
      @job_id     = SecureRandom.uuid
      @queue_name = self.class.queue_name
      @priority   = self.class.priority
      @executions = 0
    end

    # Returns a hash with the job data that can safely be passed to the
    # queueing adapter.
    def serialize
      {
        "job_class"  => self.class.name,
        "job_id"     => job_id,
        "provider_job_id" => provider_job_id,
        "queue_name" => queue_name,
        "priority"   => priority,
        "arguments"  => serialize_arguments(arguments),
        "executions" => executions,
        "locale"     => I18n.locale.to_s
      }
    end

    # Attaches the stored job data to the current instance. Receives a hash
    # returned from +serialize+
    #
    # ==== Examples
    #
    #    class DeliverWebhookJob < ActiveJob::Base
    #      def serialize
    #        super.merge('attempt_number' => (@attempt_number || 0) + 1)
    #      end
    #
    #      def deserialize(job_data)
    #        super
    #        @attempt_number = job_data['attempt_number']
    #      end
    #
    #      rescue_from(TimeoutError) do |exception|
    #        raise exception if @attempt_number > 5
    #        retry_job(wait: 10)
    #      end
    #    end
    def deserialize(job_data)
      self.job_id               = job_data["job_id"]
      self.provider_job_id      = job_data["provider_job_id"]
      self.queue_name           = job_data["queue_name"]
      self.priority             = job_data["priority"]
      self.serialized_arguments = job_data["arguments"]
      self.executions           = job_data["executions"]
      self.locale               = job_data["locale"] || I18n.locale.to_s
    end

    private
      def deserialize_arguments_if_needed
        if defined?(@serialized_arguments) && @serialized_arguments.present?
          @arguments = deserialize_arguments(@serialized_arguments)
          @serialized_arguments = nil
        end
      end

      def serialize_arguments(serialized_args)
        Arguments.serialize(serialized_args)
      end

      def deserialize_arguments(serialized_args)
        Arguments.deserialize(serialized_args)
      end
  end
end