aboutsummaryrefslogtreecommitdiffstats
path: root/activejob/lib/active_job/core.rb
blob: 0528572cd0212eca055ed4b050d81b7ae8d9628b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
module ActiveJob
  # Provides general behavior that will be included into every Active Job
  # object that inherits from ActiveJob::Base.
  module Core
    extend ActiveSupport::Concern

    included do
      # Job arguments
      attr_accessor :arguments
      attr_writer :serialized_arguments

      # Timestamp when the job should be performed
      attr_accessor :scheduled_at

      # Job Identifier
      attr_accessor :job_id

      # Queue in which the job will reside.
      attr_writer :queue_name

      # ID optionally provided by adapter
      attr_accessor :provider_job_id
    end

    # These methods will be included into any Active Job object, adding
    # helpers for de/serialization and creation of job instances.
    module ClassMethods
      # Creates a new job instance from a hash created with +serialize+
      def deserialize(job_data)
        job = job_data['job_class'].constantize.new
        job.deserialize(job_data)
        job
      end

      # Creates a job preconfigured with the given options. You can call
      # perform_later with the job arguments to enqueue the job with the
      # preconfigured options
      #
      # ==== Options
      # * <tt>:wait</tt> - Enqueues the job with the specified delay
      # * <tt>:wait_until</tt> - Enqueues the job at the time specified
      # * <tt>:queue</tt> - Enqueues the job on the specified queue
      #
      # ==== Examples
      #
      #    VideoJob.set(queue: :some_queue).perform_later(Video.last)
      #    VideoJob.set(wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
      def set(options={})
        ConfiguredJob.new(self, options)
      end
    end

    # Creates a new job instance. Takes the arguments that will be
    # passed to the perform method.
    def initialize(*arguments)
      @arguments  = arguments
      @job_id     = SecureRandom.uuid
      @queue_name = self.class.queue_name
    end

    # Returns a hash with the job data that can safely be passed to the
    # queueing adapter.
    def serialize
      {
        'job_class'  => self.class.name,
        'job_id'     => job_id,
        'queue_name' => queue_name,
        'arguments'  => serialize_arguments(arguments)
      }
    end

    # Attaches the stored job data to the current instance. Receives a hash
    # returned from +serialize+
    #
    # ==== Examples
    #
    #    class DeliverWebhookJob < ActiveJob::Base
    #      def serialize
    #        super.merge('attempt_number' => (@attempt_number || 0) + 1)
    #      end
    #
    #      def deserialize(job_data)
    #        super
    #        @attempt_number = job_data['attempt_number']
    #      end
    #
    #      rescue_from(TimeoutError) do |exception|
    #        raise exception if @attempt_number > 5
    #        retry_job(wait: 10)
    #      end
    #    end
    def deserialize(job_data)
      self.job_id               = job_data['job_id']
      self.queue_name           = job_data['queue_name']
      self.serialized_arguments = job_data['arguments']
    end

    private
      def deserialize_arguments_if_needed
        if defined?(@serialized_arguments) && @serialized_arguments.present?
          @arguments = deserialize_arguments(@serialized_arguments)
          @serialized_arguments = nil
        end
      end

      def serialize_arguments(serialized_args)
        Arguments.serialize(serialized_args)
      end

      def deserialize_arguments(serialized_args)
        Arguments.deserialize(serialized_args)
      end
  end
end