aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/server/worker.rb
blob: 49cbaec0c0485753a176a6675941bf2497da82b5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
require 'active_support/callbacks'
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent'

module ActionCable
  module Server
    # Worker used by Server.send_async to do connection work in threads.
    class Worker # :nodoc:
      include ActiveSupport::Callbacks

      thread_mattr_accessor :connection
      define_callbacks :work
      include ActiveRecordConnectionManagement

      def initialize(max_size: 5)
        @pool = Concurrent::ThreadPoolExecutor.new(
          min_threads: 1,
          max_threads: max_size,
          max_queue: 0,
        )
      end

      # Stop processing work: any work that has not already started
      # running will be discarded from the queue
      def halt
        @pool.kill
      end

      def stopping?
        @pool.shuttingdown?
      end

      def work(connection)
        self.connection = connection

        run_callbacks :work do
          yield
        end
      ensure
        self.connection = nil
      end

      def async_invoke(receiver, method, *args)
        @pool.post do
          invoke(receiver, method, *args)
        end
      end

      def invoke(receiver, method, *args)
        work(receiver) do
          begin
            receiver.send method, *args
          rescue Exception => e
            logger.error "There was an exception - #{e.class}(#{e.message})"
            logger.error e.backtrace.join("\n")

            receiver.handle_exception if receiver.respond_to?(:handle_exception)
          end
        end
      end

      def async_run_periodic_timer(channel, callback)
        @pool.post do
          run_periodic_timer(channel, callback)
        end
      end

      def run_periodic_timer(channel, callback)
        work(channel.connection) do
          callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
        end
      end

      private

        def logger
          ActionCable.server.logger
        end
    end
  end
end