aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/server/base.rb
blob: fe48c112df22d8e074811c727ceac75b74c7a276 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
require 'thread'

module ActionCable
  module Server
    # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
    # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
    #
    # Also, this is the server instance used for broadcasting. See Broadcasting for details.
    class Base
      include ActionCable::Server::Broadcasting
      include ActionCable::Server::Connections

      cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }

      def self.logger; config.logger; end
      delegate :logger, to: :config

      attr_reader :mutex

      def initialize
        @mutex = Mutex.new

        @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
      end

      # Called by rack to setup the server.
      def call(env)
        setup_heartbeat_timer
        config.connection_class.new(self, env).process
      end

      # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
      def disconnect(identifiers)
        remote_connections.where(identifiers).disconnect
      end

      # Gateway to RemoteConnections. See that class for details.
      def remote_connections
        @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
      end

      def stream_event_loop
        @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
      end

      # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
      def worker_pool
        @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
      end

      # Requires and returns a hash of all the channel class constants keyed by name.
      def channel_classes
        @channel_classes || @mutex.synchronize do
          @channel_classes ||= begin
            config.channel_paths.each { |channel_path| require channel_path }
            config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
          end
        end
      end

      # Adapter used for all streams/broadcasting.
      def pubsub
        @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
      end

      # All the identifiers applied to the connection class associated with this server.
      def connection_identifiers
        config.connection_class.identifiers
      end
    end

    ActiveSupport.run_load_hooks(:action_cable, Base.config)
  end
end