diff options
Diffstat (limited to 'actioncable/lib/action_cable')
21 files changed, 234 insertions, 110 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 714d9887d4..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -160,7 +160,10 @@ module ActionCable action = extract_action(data) if processable_action?(action) - dispatch_action(action, data) + payload = { channel_class: self.class.name, action: action, data: data } + ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do + dispatch_action(action, data) + end else logger.error "Unable to process #{action_signature(action, data)}" end @@ -192,7 +195,11 @@ module ActionCable # the proper channel identifier marked as the recipient. def transmit(data, via: nil) logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + + payload = { channel_class: self.class.name, data: data, via: via } + ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do + connection.transmit identifier: @identifier, message: data + end end def defer_subscription_confirmation! @@ -265,8 +272,11 @@ module ActionCable def transmit_subscription_confirmation unless subscription_confirmation_sent? logger.info "#{self.class.name} is transmitting the subscription confirmation" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) - @subscription_confirmation_sent = true + + ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] + @subscription_confirmation_sent = true + end end end @@ -277,7 +287,10 @@ module ActionCable def transmit_subscription_rejection logger.info "#{self.class.name} is transmitting the subscription rejection" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + + ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] + end end end end diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index 4c9d53b15a..8e1b2a4af0 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -10,6 +10,7 @@ module ActionCable # # ChatChannel.channel_name # => 'chat' # Chats::AppearancesChannel.channel_name # => 'chats:appearances' + # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name @channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index b414255707..dab604440f 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,11 +12,42 @@ module ActionCable end module ClassMethods - # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful - # for sending a steady flow of updates to a client based off an object that was configured on subscription. - # It's an alternative to using streams if the channel is able to do the work internally. - def periodically(callback, every:) - self.periodic_timers += [ [ callback, every: every ] ] + # Periodically performs a task on the channel, like updating an online + # user counter, polling a backend for new status messages, sending + # regular "heartbeat" messages, or doing some internal work and giving + # progress updates. + # + # Pass a method name or lambda argument or provide a block to call. + # Specify the calling period in seconds using the <tt>every:</tt> + # keyword argument. + # + # periodically :transmit_progress, every: 5.seconds + # + # periodically every: 3.minutes do + # transmit action: :update_count, count: current_count + # end + # + def periodically(callback_or_method_name = nil, every:, &block) + callback = + if block_given? + raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name + block + else + case callback_or_method_name + when Proc + callback_or_method_name + when Symbol + -> { __send__ callback_or_method_name } + else + raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}" + end + end + + unless every.kind_of?(Numeric) && every > 0 + raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}" + end + + self.periodic_timers += [[ callback, every: every ]] end end @@ -27,14 +58,21 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << connection.server.event_loop.timer(options[:every]) do - connection.worker_pool.async_run_periodic_timer(self, callback) + active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every)) + end + end + + def start_periodic_timer(callback, every:) + connection.server.event_loop.timer every do + connection.worker_pool.async_invoke connection do + instance_exec(&callback) end end end def stop_periodic_timers active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.clear end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..0a0a95f453 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -2,7 +2,7 @@ module ActionCable module Channel # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -19,14 +19,14 @@ module ActionCable # end # # Based on the above example, the subscribers of this channel will get whatever data is put into the, - # let's say, `comments_for_45` broadcasting as soon as it's put there. + # let's say, <tt>comments_for_45</tt> broadcasting as soon as it's put there. # # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # # If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel. - # The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE` + # The following example would subscribe to a broadcasting like <tt>comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE</tt>. # # class CommentsChannel < ApplicationCable::Channel # def subscribed @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,21 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +93,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,11 +115,60 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b4488265cb..cc4e0f8c8b 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -40,7 +40,7 @@ module ActionCable # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. # - # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log. + # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log. # # Pretty simple, eh? class Base @@ -48,11 +48,11 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger, :worker_pool + attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @@ -67,7 +67,7 @@ module ActionCable # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. - def process # :nodoc: + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -77,20 +77,22 @@ module ActionCable end end - # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) # :nodoc: - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -115,7 +117,7 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -152,7 +154,16 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open + @protocol = websocket.protocol connect if respond_to?(:connect) subscribe_to_internal_channel send_welcome_message @@ -178,7 +189,7 @@ module ActionCable # Send welcome message to the internal connection monitor channel. # This ensures the connection monitor state is reset after a successful # websocket connection. - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome]) + transmit type: ActionCable::INTERNAL[:message_types][:welcome] end def allow_request_origin? diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 9e4dbcd6e6..6f29f32ea9 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -29,7 +29,7 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, event_loop) + def initialize(env, event_target, event_loop, protocols) @env = env @event_target = event_target @event_loop = event_loop @@ -42,7 +42,7 @@ module ActionCable @ready_state = CONNECTING # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) + @driver = ::WebSocket::Driver.rack(self, protocols: protocols) @driver.on(:open) { |e| open } @driver.on(:message) { |e| receive_message(e.data) } @@ -71,6 +71,8 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) @@ -109,6 +111,10 @@ module ActionCable @ready_state == OPEN end + def protocol + @driver.protocol + end + private def open return unless @ready_state == CONNECTING diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb index c9139b6858..a4bfe7db17 100644 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -3,9 +3,10 @@ require 'faye/websocket' module ActionCable module Connection class FayeClientSocket - def initialize(env, event_target, stream_event_loop) + def initialize(env, event_target, stream_event_loop, protocols) @env = env @event_target = event_target + @protocols = protocols @faye = nil end @@ -23,6 +24,10 @@ module ActionCable @faye && @faye.close end + def protocol + @faye && @faye.protocol + end + def rack_response connect @faye.rack_response @@ -31,11 +36,12 @@ module ActionCable private def connect return if @faye - @faye = Faye::WebSocket.new(@env) + @faye = Faye::WebSocket.new(@env, @protocols) @faye.on(:open) { |event| @event_target.on_open } @faye.on(:message) { |event| @event_target.on_message(event.data) } @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } end end end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb index 8b70f3d84e..9c44b38bc3 100644 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -36,7 +36,7 @@ module ActionCable end def shutdown - inner.cancel + @inner.cancel end end end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 3c5d39f59a..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,7 +11,7 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] @@ -27,8 +27,6 @@ module ActionCable end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 19f2e6e918..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -30,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -38,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 2d97b28c09..0cf59091bc 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -29,7 +29,7 @@ module ActionCable def write(data) return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data) if @stream_send - rescue EOFError + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 5aa907c2d3..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -23,13 +23,13 @@ module ActionCable end def add(data) - id_options = decode_hash(data['identifier']) - identifier = normalize_identifier(id_options) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.channel_classes[id_options[:channel]] if subscription_klass - subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options) + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else logger.error "Subscription class not found (#{data.inspect})" end @@ -37,7 +37,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[normalize_identifier(data['identifier'])] + remove_subscription subscriptions[data['identifier']] end def remove_subscription(subscription) @@ -46,7 +46,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action(decode_hash(data['data'])) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) end def identifiers @@ -63,21 +63,8 @@ module ActionCable private delegate :logger, to: :connection - def normalize_identifier(identifier) - identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash) - identifier - end - - # If `data` is a Hash, this means that the original JSON - # sent by the client had no backslashes in it, and does - # not need to be decoded again. - def decode_hash(data) - data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash) - data.with_indifferent_access - end - def find(data) - if subscription = subscriptions[normalize_identifier(data['identifier'])] + if subscription = subscriptions[data['identifier']] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 0bec9b6a96..11f28c37e8 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil + def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil end def possible? @@ -24,6 +24,10 @@ module ActionCable websocket.close end + def protocol + websocket.protocol + end + def rack_response websocket.rack_response end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 7dc541d00c..8ce1b24962 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -4,7 +4,7 @@ require "action_cable/helpers/action_cable_helper" require "active_support/core_ext/hash/indifferent_access" module ActionCable - class Railtie < Rails::Engine # :nodoc: + class Engine < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index 67adeefaff..8ba0230d47 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -6,9 +6,9 @@ module ActionCable module VERSION MAJOR = 5 - MINOR = 0 + MINOR = 1 TINY = 0 - PRE = "beta3" + PRE = "alpha" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index aeef8abc72..a528024427 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -28,7 +28,7 @@ module ActionCable private # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>. - # Exists for the solely for the purpose of calling #disconnect on that connection. + # Exists solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 778f5ffeed..b1a0e11631 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -52,7 +52,17 @@ module ActionCable @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # db connection pool instead. def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 98025f27f2..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -19,27 +19,28 @@ module ActionCable # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, String(broadcasting)) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 5fe71caed2..0bb378cf03 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -14,7 +14,7 @@ module ActionCable @log_tags = [] @connection_class = ActionCable::Connection::Base - @worker_pool_size = 100 + @worker_pool_size = 4 @disable_request_forgery_protection = false end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 49cbaec0c0..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e @@ -59,18 +61,6 @@ module ActionCable end end - def async_run_periodic_timer(channel, callback) - @pool.post do - run_periodic_timer(channel, callback) - end - end - - def run_periodic_timer(channel, callback) - work(channel.connection) do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - private def logger diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index e6c862959b..4735a4bfa8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -51,7 +51,11 @@ module ActionCable @redis_connection_for_subscriptions || @server.mutex.synchronize do @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @logger.error "[ActionCable] Redis reconnect failed." + end + + redis.on(:failed) do + @logger.error "[ActionCable] Redis connection has failed." end end end |