aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb23
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb52
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb81
-rw-r--r--actioncable/lib/action_cable/connection.rb2
-rw-r--r--actioncable/lib/action_cable/connection/base.rb47
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb18
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb48
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb44
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb8
-rw-r--r--actioncable/lib/action_cable/connection/message_buffer.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb2
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb11
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb25
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb8
-rw-r--r--actioncable/lib/action_cable/engine.rb2
-rw-r--r--actioncable/lib/action_cable/server/base.rb22
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb19
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb20
-rw-r--r--actioncable/lib/action_cable/server/connections.rb6
-rw-r--r--actioncable/lib/action_cable/server/worker.rb30
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb2
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb9
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb9
24 files changed, 371 insertions, 132 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 714d9887d4..845b747fc5 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -160,7 +160,10 @@ module ActionCable
action = extract_action(data)
if processable_action?(action)
- dispatch_action(action, data)
+ payload = { channel_class: self.class.name, action: action, data: data }
+ ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
+ dispatch_action(action, data)
+ end
else
logger.error "Unable to process #{action_signature(action, data)}"
end
@@ -192,7 +195,11 @@ module ActionCable
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via }
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+
+ payload = { channel_class: self.class.name, data: data, via: via }
+ ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
+ connection.transmit identifier: @identifier, message: data
+ end
end
def defer_subscription_confirmation!
@@ -265,8 +272,11 @@ module ActionCable
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
- @subscription_confirmation_sent = true
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
+ @subscription_confirmation_sent = true
+ end
end
end
@@ -277,7 +287,10 @@ module ActionCable
def transmit_subscription_rejection
logger.info "#{self.class.name} is transmitting the subscription rejection"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
+ connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 0f6e854520..dab604440f 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -12,11 +12,42 @@ module ActionCable
end
module ClassMethods
- # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
- # for sending a steady flow of updates to a client based off an object that was configured on subscription.
- # It's an alternative to using streams if the channel is able to do the work internally.
- def periodically(callback, every:)
- self.periodic_timers += [ [ callback, every: every ] ]
+ # Periodically performs a task on the channel, like updating an online
+ # user counter, polling a backend for new status messages, sending
+ # regular "heartbeat" messages, or doing some internal work and giving
+ # progress updates.
+ #
+ # Pass a method name or lambda argument or provide a block to call.
+ # Specify the calling period in seconds using the <tt>every:</tt>
+ # keyword argument.
+ #
+ # periodically :transmit_progress, every: 5.seconds
+ #
+ # periodically every: 3.minutes do
+ # transmit action: :update_count, count: current_count
+ # end
+ #
+ def periodically(callback_or_method_name = nil, every:, &block)
+ callback =
+ if block_given?
+ raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name
+ block
+ else
+ case callback_or_method_name
+ when Proc
+ callback_or_method_name
+ when Symbol
+ -> { __send__ callback_or_method_name }
+ else
+ raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
+ end
+ end
+
+ unless every.kind_of?(Numeric) && every > 0
+ raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
+ end
+
+ self.periodic_timers += [[ callback, every: every ]]
end
end
@@ -27,14 +58,21 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
- connection.worker_pool.async_run_periodic_timer(self, callback)
+ active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
+ end
+ end
+
+ def start_periodic_timer(callback, every:)
+ connection.server.event_loop.timer every do
+ connection.worker_pool.async_invoke connection do
+ instance_exec(&callback)
end
end
end
def stop_periodic_timers
active_periodic_timers.each { |timer| timer.shutdown }
+ active_periodic_timers.clear
end
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 431a5c1063..200c9d053c 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -2,7 +2,7 @@ module ActionCable
module Channel
# Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
# placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
- # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent.
+ # streaming a broadcasting at the very moment it sends out an update, you will not get that update, even if you connect after it has been sent.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
@@ -46,9 +46,7 @@ module ActionCable
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
- # stream_for @room, -> (encoded_message) do
- # message = ActiveSupport::JSON.decode(encoded_message)
- #
+ # stream_for @room, coder: ActiveSupport::JSON do |message|
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
@@ -71,16 +69,21 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
- def stream_from(broadcasting, callback = nil)
+ # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
+ # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ def stream_from(broadcasting, callback = nil, coder: nil, &block)
broadcasting = String(broadcasting)
+
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
- callback ||= default_stream_callback(broadcasting)
- streams << [ broadcasting, callback ]
+ # Build a stream handler by wrapping the user-provided callback with
+ # a decoder or defaulting to a JSON-decoding retransmitter.
+ handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
+ streams << [ broadcasting, handler ]
- Concurrent.global_io_executor.post do
- pubsub.subscribe(broadcasting, callback, lambda do
+ connection.server.event_loop.post do
+ pubsub.subscribe(broadcasting, handler, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end)
@@ -90,8 +93,11 @@ module ActionCable
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
- def stream_for(model, callback = nil)
- stream_from(broadcasting_for([ channel_name, model ]), callback)
+ #
+ # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
+ # Defaults to `coder: nil` which does no decoding, passes raw messages.
+ def stream_for(model, callback = nil, coder: nil, &block)
+ stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
end
# Unsubscribes all streams associated with this channel from the pubsub queue.
@@ -109,11 +115,60 @@ module ActionCable
@_streams ||= []
end
- def default_stream_callback(broadcasting)
+ # Always wrap the outermost handler to invoke the user handler on the
+ # worker pool rather than blocking the event loop.
+ def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
+ handler = stream_handler(broadcasting, user_handler, coder: coder)
+
+ -> message do
+ connection.worker_pool.async_invoke handler, :call, message, connection: connection
+ end
+ end
+
+ # May be overridden to add instrumentation, logging, specialized error
+ # handling, or other forms of handler decoration.
+ #
+ # TODO: Tests demonstrating this.
+ def stream_handler(broadcasting, user_handler, coder: nil)
+ if user_handler
+ stream_decoder user_handler, coder: coder
+ else
+ default_stream_handler broadcasting, coder: coder
+ end
+ end
+
+ # May be overridden to change the default stream handling behavior
+ # which decodes JSON and transmits to client.
+ #
+ # TODO: Tests demonstrating this.
+ #
+ # TODO: Room for optimization. Update transmit API to be coder-aware
+ # so we can no-op when pubsub and connection are both JSON-encoded.
+ # Then we can skip decode+encode if we're just proxying messages.
+ def default_stream_handler(broadcasting, coder:)
+ coder ||= ActiveSupport::JSON
+ stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
+ end
+
+ def stream_decoder(handler = identity_handler, coder:)
+ if coder
+ -> message { handler.(coder.decode(message)) }
+ else
+ handler
+ end
+ end
+
+ def stream_transmitter(handler = identity_handler, broadcasting:)
+ via = "streamed from #{broadcasting}"
+
-> (message) do
- transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
+ transmit handler.(message), via: via
end
end
+
+ def identity_handler
+ -> message { message }
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index 902efb07e2..5f813cf8e0 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -8,6 +8,8 @@ module ActionCable
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
+ autoload :FayeClientSocket
+ autoload :FayeEventLoop
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index f34f5eb109..cc4e0f8c8b 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -40,7 +40,7 @@ module ActionCable
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
- # Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
+ # Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
@@ -48,16 +48,16 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger, :worker_pool
- delegate :stream_event_loop, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
+ delegate :event_loop, :pubsub, to: :server
- def initialize(server, env)
- @server, @env = server, env
+ def initialize(server, env, coder: ActiveSupport::JSON)
+ @server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -67,7 +67,7 @@ module ActionCable
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
- def process # :nodoc:
+ def process #:nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
@@ -77,20 +77,22 @@ module ActionCable
end
end
- # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
- # The data is routed to the proper channel that the connection has subscribed to.
- def receive(data_in_json)
+ # Decodes WebSocket messages and dispatches them to subscribed channels.
+ # WebSocket message transfer encoding is always JSON.
+ def receive(websocket_message) #:nodoc:
+ send_async :dispatch_websocket_message, websocket_message
+ end
+
+ def dispatch_websocket_message(websocket_message) #:nodoc:
if websocket.alive?
- subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
+ subscriptions.execute_command decode(websocket_message)
else
- logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
+ logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
- # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
- # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
- def transmit(data) # :nodoc:
- websocket.transmit data
+ def transmit(cable_message) # :nodoc:
+ websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
@@ -115,7 +117,7 @@ module ActionCable
end
def beat
- transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
+ transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
def on_open # :nodoc:
@@ -152,7 +154,16 @@ module ActionCable
attr_reader :message_buffer
private
+ def encode(cable_message)
+ @coder.encode cable_message
+ end
+
+ def decode(websocket_message)
+ @coder.decode websocket_message
+ end
+
def handle_open
+ @protocol = websocket.protocol
connect if respond_to?(:connect)
subscribe_to_internal_channel
send_welcome_message
@@ -178,7 +189,7 @@ module ActionCable
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
- transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
+ transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end
def allow_request_origin?
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index f6b11e93f0..6f29f32ea9 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -29,10 +29,10 @@ module ActionCable
attr_reader :env, :url
- def initialize(env, event_target, stream_event_loop)
- @env = env
- @event_target = event_target
- @stream_event_loop = stream_event_loop
+ def initialize(env, event_target, event_loop, protocols)
+ @env = env
+ @event_target = event_target
+ @event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@@ -42,14 +42,14 @@ module ActionCable
@ready_state = CONNECTING
# The driver calls +env+, +url+, and +write+
- @driver = ::WebSocket::Driver.rack(self)
+ @driver = ::WebSocket::Driver.rack(self, protocols: protocols)
@driver.on(:open) { |e| open }
@driver.on(:message) { |e| receive_message(e.data) }
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
- @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
+ @stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
def start_driver
@@ -71,6 +71,8 @@ module ActionCable
def write(data)
@stream.write(data)
+ rescue => e
+ emit_error e.message
end
def transmit(message)
@@ -109,6 +111,10 @@ module ActionCable
@ready_state == OPEN
end
+ def protocol
+ @driver.protocol
+ end
+
private
def open
return unless @ready_state == CONNECTING
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
new file mode 100644
index 0000000000..a4bfe7db17
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb
@@ -0,0 +1,48 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ class FayeClientSocket
+ def initialize(env, event_target, stream_event_loop, protocols)
+ @env = env
+ @event_target = event_target
+ @protocols = protocols
+
+ @faye = nil
+ end
+
+ def alive?
+ @faye && @faye.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ connect
+ @faye.send data
+ end
+
+ def close
+ @faye && @faye.close
+ end
+
+ def protocol
+ @faye && @faye.protocol
+ end
+
+ def rack_response
+ connect
+ @faye.rack_response
+ end
+
+ private
+ def connect
+ return if @faye
+ @faye = Faye::WebSocket.new(@env, @protocols)
+
+ @faye.on(:open) { |event| @event_target.on_open }
+ @faye.on(:message) { |event| @event_target.on_message(event.data) }
+ @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
+ @faye.on(:error) { |event| @event_target.on_error(event.message) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
new file mode 100644
index 0000000000..9c44b38bc3
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb
@@ -0,0 +1,44 @@
+require 'thread'
+
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module Connection
+ class FayeEventLoop
+ @@mutex = Mutex.new
+
+ def timer(interval, &block)
+ ensure_reactor_running
+ EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ ensure_reactor_running
+ ::EM.next_tick(&task)
+ end
+
+ private
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+
+ class EMTimer
+ def initialize(inner)
+ @inner = inner
+ end
+
+ def shutdown
+ @inner.cancel
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 27826792b3..f70d52f99b 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -11,24 +11,22 @@ module ActionCable
def subscribe_to_internal_channel
if connection_identifier.present?
- callback = -> (message) { process_internal_message(message) }
+ callback = -> (message) { process_internal_message decode(message) }
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end
def process_internal_message(message)
- message = ActiveSupport::JSON.decode(message)
-
case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb
index 19f2e6e918..6a80770cae 100644
--- a/actioncable/lib/action_cable/connection/message_buffer.rb
+++ b/actioncable/lib/action_cable/connection/message_buffer.rb
@@ -30,7 +30,7 @@ module ActionCable
protected
attr_reader :connection
- attr_accessor :buffered_messages
+ attr_reader :buffered_messages
private
def valid?(message)
@@ -38,7 +38,7 @@ module ActionCable
end
def receive(message)
- connection.send_async :receive, message
+ connection.receive message
end
def buffer(message)
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 2d97b28c09..0cf59091bc 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -29,7 +29,7 @@ module ActionCable
def write(data)
return @rack_hijack_io.write(data) if @rack_hijack_io
return @stream_send.call(data) if @stream_send
- rescue EOFError
+ rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index e6335082d2..2abad09c03 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -11,7 +11,16 @@ module ActionCable
@todo = Queue.new
@spawn_mutex = Mutex.new
- spawn
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ Concurrent.global_io_executor << task
end
def attach(io, stream)
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 5aa907c2d3..3742f248d1 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -23,13 +23,13 @@ module ActionCable
end
def add(data)
- id_options = decode_hash(data['identifier'])
- identifier = normalize_identifier(id_options)
+ id_key = data['identifier']
+ id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.channel_classes[id_options[:channel]]
if subscription_klass
- subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options)
+ subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
@@ -37,7 +37,7 @@ module ActionCable
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
- remove_subscription subscriptions[normalize_identifier(data['identifier'])]
+ remove_subscription subscriptions[data['identifier']]
end
def remove_subscription(subscription)
@@ -46,7 +46,7 @@ module ActionCable
end
def perform_action(data)
- find(data).perform_action(decode_hash(data['data']))
+ find(data).perform_action ActiveSupport::JSON.decode(data['data'])
end
def identifiers
@@ -63,21 +63,8 @@ module ActionCable
private
delegate :logger, to: :connection
- def normalize_identifier(identifier)
- identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash)
- identifier
- end
-
- # If `data` is a Hash, this means that the original JSON
- # sent by the client had no backslashes in it, and does
- # not need to be decoded again.
- def decode_hash(data)
- data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash)
- data.with_indifferent_access
- end
-
def find(data)
- if subscription = subscriptions[normalize_identifier(data['identifier'])]
+ if subscription = subscriptions[data['identifier']]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 5e89fb9b72..11f28c37e8 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, stream_event_loop)
- @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols])
+ @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil
end
def possible?
@@ -24,6 +24,10 @@ module ActionCable
websocket.close
end
+ def protocol
+ websocket.protocol
+ end
+
def rack_response
websocket.rack_response
end
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index c90aadaf2c..7dc541d00c 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access"
module ActionCable
class Railtie < Rails::Engine # :nodoc:
config.action_cable = ActiveSupport::OrderedOptions.new
- config.action_cable.mount_path = '/cable'
+ config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path]
config.eager_load_namespaces << ActionCable
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index d9a2653cc2..b1a0e11631 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,4 +1,4 @@
-require 'thread'
+require 'monitor'
module ActionCable
module Server
@@ -18,8 +18,8 @@ module ActionCable
attr_reader :mutex
def initialize
- @mutex = Mutex.new
- @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ @mutex = Monitor.new
+ @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by Rack to setup the server.
@@ -48,11 +48,21 @@ module ActionCable
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
- def stream_event_loop
- @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
+ def event_loop
+ @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
end
- # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
+ # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread.
+ # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out
+ # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size.
+ #
+ # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool.
+ # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database
+ # connections.
+ #
+ # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe
+ # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger
+ # db connection pool instead.
def worker_pool
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index 98025f27f2..8f93564113 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -19,27 +19,28 @@ module ActionCable
# new Notification data['title'], body: data['body']
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded.
- def broadcast(broadcasting, message)
- broadcaster_for(broadcasting).broadcast(message)
+ def broadcast(broadcasting, message, coder: ActiveSupport::JSON)
+ broadcaster_for(broadcasting, coder: coder).broadcast(message)
end
# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
# may need multiple spots to transmit to a specific broadcasting over and over.
- def broadcaster_for(broadcasting)
- Broadcaster.new(self, String(broadcasting))
+ def broadcaster_for(broadcasting, coder: ActiveSupport::JSON)
+ Broadcaster.new(self, String(broadcasting), coder: coder)
end
private
class Broadcaster
- attr_reader :server, :broadcasting
+ attr_reader :server, :broadcasting, :coder
- def initialize(server, broadcasting)
- @server, @broadcasting = server, broadcasting
+ def initialize(server, broadcasting, coder:)
+ @server, @broadcasting, @coder = server, broadcasting, coder
end
def broadcast(message)
- server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
+ server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}"
+ encoded = coder ? coder.encode(message) : message
+ server.pubsub.broadcast broadcasting, encoded
end
end
end
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 9a7301287c..0bb378cf03 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -4,7 +4,7 @@ module ActionCable
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
- attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :use_faye, :connection_class, :worker_pool_size
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path
@@ -14,7 +14,7 @@ module ActionCable
@log_tags = []
@connection_class = ActionCable::Connection::Base
- @worker_pool_size = 100
+ @worker_pool_size = 4
@disable_request_forgery_protection = false
end
@@ -43,6 +43,22 @@ module ActionCable
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
+
+ def event_loop_class
+ if use_faye
+ ActionCable::Connection::FayeEventLoop
+ else
+ ActionCable::Connection::StreamEventLoop
+ end
+ end
+
+ def client_socket_class
+ if use_faye
+ ActionCable::Connection::FayeClientSocket
+ else
+ ActionCable::Connection::ClientSocket
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 4dc8934b25..5e61b4e335 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -21,9 +21,9 @@ module ActionCable
# then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
- Concurrent.global_io_executor.post { connections.map(&:beat) }
- end.tap(&:execute)
+ @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do
+ event_loop.post { connections.map(&:beat) }
+ end
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index 49cbaec0c0..a638ff72e7 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -12,8 +12,10 @@ module ActionCable
define_callbacks :work
include ActiveRecordConnectionManagement
+ attr_reader :executor
+
def initialize(max_size: 5)
- @pool = Concurrent::ThreadPoolExecutor.new(
+ @executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
@@ -23,11 +25,11 @@ module ActionCable
# Stop processing work: any work that has not already started
# running will be discarded from the queue
def halt
- @pool.kill
+ @executor.kill
end
def stopping?
- @pool.shuttingdown?
+ @executor.shuttingdown?
end
def work(connection)
@@ -40,14 +42,14 @@ module ActionCable
self.connection = nil
end
- def async_invoke(receiver, method, *args)
- @pool.post do
- invoke(receiver, method, *args)
+ def async_invoke(receiver, method, *args, connection: receiver)
+ @executor.post do
+ invoke(receiver, method, *args, connection: connection)
end
end
- def invoke(receiver, method, *args)
- work(receiver) do
+ def invoke(receiver, method, *args, connection:)
+ work(connection) do
begin
receiver.send method, *args
rescue Exception => e
@@ -59,18 +61,6 @@ module ActionCable
end
end
- def async_run_periodic_timer(channel, callback)
- @pool.post do
- run_periodic_timer(channel, callback)
- end
- end
-
- def run_periodic_timer(channel, callback)
- work(channel.connection) do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- end
-
private
def logger
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index cca6894289..10b3ac8cd8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -5,16 +5,21 @@ module ActionCable
class Async < Inline # :nodoc:
private
def new_subscriber_map
- AsyncSubscriberMap.new
+ AsyncSubscriberMap.new(server.event_loop)
end
class AsyncSubscriberMap < SubscriberMap
+ def initialize(event_loop)
+ @event_loop = event_loop
+ super()
+ end
+
def add_subscriber(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
index e6c862959b..256876cf30 100644
--- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -51,7 +51,7 @@ module ActionCable
@redis_connection_for_subscriptions || @server.mutex.synchronize do
@redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ @logger.error "[ActionCable] Redis reconnect failed."
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index abaeb92e54..66c7852f6e 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -42,14 +42,15 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
@@ -68,7 +69,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- Concurrent.global_io_executor << callback if callback
+ @event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -98,7 +99,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 6b4236e7d3..65434f7107 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -38,7 +38,7 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@@ -52,10 +52,11 @@ module ActionCable
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@@ -84,7 +85,7 @@ module ActionCable
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
- Concurrent.global_io_executor << next_callback if next_callback
+ @event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@@ -133,7 +134,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
private