aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/channel/base.rb25
-rw-r--r--actioncable/lib/action_cable/channel/periodic_timers.rb2
-rw-r--r--actioncable/lib/action_cable/channel/streams.rb5
-rw-r--r--actioncable/lib/action_cable/connection.rb2
-rw-r--r--actioncable/lib/action_cable/connection/base.rb17
-rw-r--r--actioncable/lib/action_cable/connection/client_socket.rb21
-rw-r--r--actioncable/lib/action_cable/connection/faye_client_socket.rb43
-rw-r--r--actioncable/lib/action_cable/connection/faye_event_loop.rb44
-rw-r--r--actioncable/lib/action_cable/connection/internal_channel.rb4
-rw-r--r--actioncable/lib/action_cable/connection/stream.rb2
-rw-r--r--actioncable/lib/action_cable/connection/stream_event_loop.rb11
-rw-r--r--actioncable/lib/action_cable/connection/subscriptions.rb25
-rw-r--r--actioncable/lib/action_cable/connection/web_socket.rb4
-rw-r--r--actioncable/lib/action_cable/engine.rb27
-rw-r--r--actioncable/lib/action_cable/helpers/action_cable_helper.rb12
-rw-r--r--actioncable/lib/action_cable/server/base.rb20
-rw-r--r--actioncable/lib/action_cable/server/broadcasting.rb2
-rw-r--r--actioncable/lib/action_cable/server/configuration.rb18
-rw-r--r--actioncable/lib/action_cable/server/connections.rb6
-rw-r--r--actioncable/lib/action_cable/server/worker.rb48
-rw-r--r--actioncable/lib/action_cable/server/worker/active_record_connection_management.rb3
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb2
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb9
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb17
25 files changed, 289 insertions, 91 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb
index 05764fe107..464d0581dd 100644
--- a/actioncable/lib/action_cable/channel/base.rb
+++ b/actioncable/lib/action_cable/channel/base.rb
@@ -160,13 +160,16 @@ module ActionCable
action = extract_action(data)
if processable_action?(action)
- dispatch_action(action, data)
+ payload = { channel_class: self.class.name, action: action, data: data }
+ ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do
+ dispatch_action(action, data)
+ end
else
logger.error "Unable to process #{action_signature(action, data)}"
end
end
- # Called by the cable connection when its cut, so the channel has a chance to cleanup with callbacks.
+ # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc:
run_callbacks :unsubscribe do
@@ -192,7 +195,11 @@ module ActionCable
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via }
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+
+ payload = { channel_class: self.class.name, data: data, via: via }
+ ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
+ end
end
def defer_subscription_confirmation!
@@ -265,8 +272,11 @@ module ActionCable
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
- @subscription_confirmation_sent = true
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ @subscription_confirmation_sent = true
+ end
end
end
@@ -277,7 +287,10 @@ module ActionCable
def transmit_subscription_rejection
logger.info "#{self.class.name} is transmitting the subscription rejection"
- connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+
+ ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb
index 0f6e854520..b414255707 100644
--- a/actioncable/lib/action_cable/channel/periodic_timers.rb
+++ b/actioncable/lib/action_cable/channel/periodic_timers.rb
@@ -27,7 +27,7 @@ module ActionCable
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
- active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do
+ active_periodic_timers << connection.server.event_loop.timer(options[:every]) do
connection.worker_pool.async_run_periodic_timer(self, callback)
end
end
diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb
index 3e3be4cd44..23d7320a28 100644
--- a/actioncable/lib/action_cable/channel/streams.rb
+++ b/actioncable/lib/action_cable/channel/streams.rb
@@ -1,6 +1,6 @@
module ActionCable
module Channel
- # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
+ # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data
# placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
# streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent.
#
@@ -72,13 +72,14 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
+ broadcasting = String(broadcasting)
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- Concurrent.global_io_executor.post do
+ connection.server.event_loop.post do
pubsub.subscribe(broadcasting, callback, lambda do
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb
index 902efb07e2..5f813cf8e0 100644
--- a/actioncable/lib/action_cable/connection.rb
+++ b/actioncable/lib/action_cable/connection.rb
@@ -8,6 +8,8 @@ module ActionCable
autoload :ClientSocket
autoload :Identification
autoload :InternalChannel
+ autoload :FayeClientSocket
+ autoload :FayeEventLoop
autoload :MessageBuffer
autoload :Stream
autoload :StreamEventLoop
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index 60f3ad3e06..b4488265cb 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -48,15 +48,16 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions, :logger
- delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
+ attr_reader :server, :env, :subscriptions, :logger, :worker_pool
+ delegate :event_loop, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
+ @worker_pool = server.worker_pool
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@@ -114,7 +115,7 @@ module ActionCable
end
def beat
- transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
+ transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
end
def on_open # :nodoc:
@@ -154,7 +155,7 @@ module ActionCable
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
- confirm_connection_monitor_subscription
+ send_welcome_message
message_buffer.process!
server.add_connection(self)
@@ -173,11 +174,11 @@ module ActionCable
disconnect if respond_to?(:disconnect)
end
- def confirm_connection_monitor_subscription
- # Send confirmation message to the internal connection monitor channel.
+ def send_welcome_message
+ # Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
- transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], type: ActionCable::INTERNAL[:message_types][:confirmation])
+ transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
end
def allow_request_origin?
diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb
index d7632b05fe..7d6de78582 100644
--- a/actioncable/lib/action_cable/connection/client_socket.rb
+++ b/actioncable/lib/action_cable/connection/client_socket.rb
@@ -29,10 +29,10 @@ module ActionCable
attr_reader :env, :url
- def initialize(env, event_target, stream_event_loop)
- @env = env
- @event_target = event_target
- @stream_event_loop = stream_event_loop
+ def initialize(env, event_target, event_loop)
+ @env = env
+ @event_target = event_target
+ @event_loop = event_loop
@url = ClientSocket.determine_url(@env)
@@ -49,16 +49,17 @@ module ActionCable
@driver.on(:close) { |e| begin_close(e.reason, e.code) }
@driver.on(:error) { |e| emit_error(e.message) }
- @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self)
-
- if callback = @env['async.callback']
- callback.call([101, {}, @stream])
- end
+ @stream = ActionCable::Connection::Stream.new(@event_loop, self)
end
def start_driver
return if @driver.nil? || @driver_started
@stream.hijack_rack_socket
+
+ if callback = @env['async.callback']
+ callback.call([101, {}, @stream])
+ end
+
@driver_started = true
@driver.start
end
@@ -70,6 +71,8 @@ module ActionCable
def write(data)
@stream.write(data)
+ rescue => e
+ emit_error e.message
end
def transmit(message)
diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb
new file mode 100644
index 0000000000..47d09a9e14
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb
@@ -0,0 +1,43 @@
+require 'faye/websocket'
+
+module ActionCable
+ module Connection
+ class FayeClientSocket
+ def initialize(env, event_target, stream_event_loop)
+ @env = env
+ @event_target = event_target
+
+ @faye = nil
+ end
+
+ def alive?
+ @faye && @faye.ready_state == Faye::WebSocket::API::OPEN
+ end
+
+ def transmit(data)
+ connect
+ @faye.send data
+ end
+
+ def close
+ @faye && @faye.close
+ end
+
+ def rack_response
+ connect
+ @faye.rack_response
+ end
+
+ private
+ def connect
+ return if @faye
+ @faye = Faye::WebSocket.new(@env)
+
+ @faye.on(:open) { |event| @event_target.on_open }
+ @faye.on(:message) { |event| @event_target.on_message(event.data) }
+ @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) }
+ @faye.on(:error) { |event| @event_target.on_error(event.message) }
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb
new file mode 100644
index 0000000000..9c44b38bc3
--- /dev/null
+++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb
@@ -0,0 +1,44 @@
+require 'thread'
+
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module Connection
+ class FayeEventLoop
+ @@mutex = Mutex.new
+
+ def timer(interval, &block)
+ ensure_reactor_running
+ EMTimer.new(::EM::PeriodicTimer.new(interval, &block))
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ ensure_reactor_running
+ ::EM.next_tick(&task)
+ end
+
+ private
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+
+ class EMTimer
+ def initialize(inner)
+ @inner = inner
+ end
+
+ def shutdown
+ @inner.cancel
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb
index 27826792b3..3c5d39f59a 100644
--- a/actioncable/lib/action_cable/connection/internal_channel.rb
+++ b/actioncable/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_subscriptions ||= []
@_internal_subscriptions << [ internal_channel, callback ]
- Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) }
+ server.event_loop.post { pubsub.subscribe(internal_channel, callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_subscriptions.present?
- @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } }
+ @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } }
end
end
diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb
index 2d97b28c09..0cf59091bc 100644
--- a/actioncable/lib/action_cable/connection/stream.rb
+++ b/actioncable/lib/action_cable/connection/stream.rb
@@ -29,7 +29,7 @@ module ActionCable
def write(data)
return @rack_hijack_io.write(data) if @rack_hijack_io
return @stream_send.call(data) if @stream_send
- rescue EOFError
+ rescue EOFError, Errno::ECONNRESET
@socket_object.client_gone
end
diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb
index e6335082d2..2abad09c03 100644
--- a/actioncable/lib/action_cable/connection/stream_event_loop.rb
+++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb
@@ -11,7 +11,16 @@ module ActionCable
@todo = Queue.new
@spawn_mutex = Mutex.new
- spawn
+ end
+
+ def timer(interval, &block)
+ Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute)
+ end
+
+ def post(task = nil, &block)
+ task ||= block
+
+ Concurrent.global_io_executor << task
end
def attach(io, stream)
diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb
index 3742f248d1..5aa907c2d3 100644
--- a/actioncable/lib/action_cable/connection/subscriptions.rb
+++ b/actioncable/lib/action_cable/connection/subscriptions.rb
@@ -23,13 +23,13 @@ module ActionCable
end
def add(data)
- id_key = data['identifier']
- id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
+ id_options = decode_hash(data['identifier'])
+ identifier = normalize_identifier(id_options)
subscription_klass = connection.server.channel_classes[id_options[:channel]]
if subscription_klass
- subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
+ subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
@@ -37,7 +37,7 @@ module ActionCable
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
- remove_subscription subscriptions[data['identifier']]
+ remove_subscription subscriptions[normalize_identifier(data['identifier'])]
end
def remove_subscription(subscription)
@@ -46,7 +46,7 @@ module ActionCable
end
def perform_action(data)
- find(data).perform_action ActiveSupport::JSON.decode(data['data'])
+ find(data).perform_action(decode_hash(data['data']))
end
def identifiers
@@ -63,8 +63,21 @@ module ActionCable
private
delegate :logger, to: :connection
+ def normalize_identifier(identifier)
+ identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash)
+ identifier
+ end
+
+ # If `data` is a Hash, this means that the original JSON
+ # sent by the client had no backslashes in it, and does
+ # not need to be decoded again.
+ def decode_hash(data)
+ data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash)
+ data.with_indifferent_access
+ end
+
def find(data)
- if subscription = subscriptions[data['identifier']]
+ if subscription = subscriptions[normalize_identifier(data['identifier'])]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb
index 5e89fb9b72..0bec9b6a96 100644
--- a/actioncable/lib/action_cable/connection/web_socket.rb
+++ b/actioncable/lib/action_cable/connection/web_socket.rb
@@ -4,8 +4,8 @@ module ActionCable
module Connection
# Wrap the real socket to minimize the externally-presented API
class WebSocket
- def initialize(env, event_target, stream_event_loop)
- @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil
+ def initialize(env, event_target, event_loop, client_socket_class)
+ @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil
end
def possible?
diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb
index ae0c59dccd..7dc541d00c 100644
--- a/actioncable/lib/action_cable/engine.rb
+++ b/actioncable/lib/action_cable/engine.rb
@@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access"
module ActionCable
class Railtie < Rails::Engine # :nodoc:
config.action_cable = ActiveSupport::OrderedOptions.new
- config.action_cable.mount_path = '/cable'
+ config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path]
config.eager_load_namespaces << ActionCable
@@ -51,5 +51,30 @@ module ActionCable
end
end
end
+
+ initializer "action_cable.set_work_hooks" do |app|
+ ActiveSupport.on_load(:action_cable) do
+ ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
+ app.executor.wrap do
+ # If we took a while to get the lock, we may have been halted
+ # in the meantime. As we haven't started doing any real work
+ # yet, we should pretend that we never made it off the queue.
+ unless stopping?
+ inner.call
+ end
+ end
+ end
+
+ wrap = lambda do |_, inner|
+ app.executor.wrap(&inner)
+ end
+ ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
+ ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap
+
+ app.reloader.before_class_unload do
+ ActionCable.server.restart
+ end
+ end
+ end
end
end
diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
index 200732fdcd..2081a37db6 100644
--- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb
+++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb
@@ -1,24 +1,24 @@
module ActionCable
module Helpers
module ActionCableHelper
- # Returns an "action-cable-url" meta tag with the value of the url specified in your
- # configuration. Ensure this is above your javascript tag:
+ # Returns an "action-cable-url" meta tag with the value of the URL specified in your
+ # configuration. Ensure this is above your JavaScript tag:
#
# <head>
# <%= action_cable_meta_tag %>
# <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %>
# </head>
#
- # This is then used by Action Cable to determine the url of your WebSocket server.
+ # This is then used by Action Cable to determine the URL of your WebSocket server.
# Your CoffeeScript can then connect to the server without needing to specify the
- # url directly:
+ # URL directly:
#
# #= require cable
# @App = {}
# App.cable = Cable.createConsumer()
#
- # Make sure to specify the correct server location in each of your environments
- # config file:
+ # Make sure to specify the correct server location in each of your environment
+ # config files:
#
# config.action_cable.mount_path = "/cable123"
# <%= action_cable_meta_tag %> would render:
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index c3b64299e3..778f5ffeed 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,4 +1,4 @@
-require 'thread'
+require 'monitor'
module ActionCable
module Server
@@ -18,8 +18,8 @@ module ActionCable
attr_reader :mutex
def initialize
- @mutex = Mutex.new
- @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
+ @mutex = Monitor.new
+ @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by Rack to setup the server.
@@ -33,13 +33,23 @@ module ActionCable
remote_connections.where(identifiers).disconnect
end
+ def restart
+ connections.each(&:close)
+
+ @mutex.synchronize do
+ worker_pool.halt if @worker_pool
+
+ @worker_pool = nil
+ end
+ end
+
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
- def stream_event_loop
- @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
+ def event_loop
+ @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb
index f90fe7b9e2..98025f27f2 100644
--- a/actioncable/lib/action_cable/server/broadcasting.rb
+++ b/actioncable/lib/action_cable/server/broadcasting.rb
@@ -26,7 +26,7 @@ module ActionCable
# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
# may need multiple spots to transmit to a specific broadcasting over and over.
def broadcaster_for(broadcasting)
- Broadcaster.new(self, broadcasting)
+ Broadcaster.new(self, String(broadcasting))
end
private
diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb
index 9a7301287c..5fe71caed2 100644
--- a/actioncable/lib/action_cable/server/configuration.rb
+++ b/actioncable/lib/action_cable/server/configuration.rb
@@ -4,7 +4,7 @@ module ActionCable
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
- attr_accessor :connection_class, :worker_pool_size
+ attr_accessor :use_faye, :connection_class, :worker_pool_size
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path
@@ -43,6 +43,22 @@ module ActionCable
adapter = 'PostgreSQL' if adapter == 'Postgresql'
"ActionCable::SubscriptionAdapter::#{adapter}".constantize
end
+
+ def event_loop_class
+ if use_faye
+ ActionCable::Connection::FayeEventLoop
+ else
+ ActionCable::Connection::StreamEventLoop
+ end
+ end
+
+ def client_socket_class
+ if use_faye
+ ActionCable::Connection::FayeClientSocket
+ else
+ ActionCable::Connection::ClientSocket
+ end
+ end
end
end
end
diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb
index 4dc8934b25..5e61b4e335 100644
--- a/actioncable/lib/action_cable/server/connections.rb
+++ b/actioncable/lib/action_cable/server/connections.rb
@@ -21,9 +21,9 @@ module ActionCable
# then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
- @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do
- Concurrent.global_io_executor.post { connections.map(&:beat) }
- end.tap(&:execute)
+ @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do
+ event_loop.post { connections.map(&:beat) }
+ end
end
def open_connections_statistics
diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb
index b920b880db..49cbaec0c0 100644
--- a/actioncable/lib/action_cable/server/worker.rb
+++ b/actioncable/lib/action_cable/server/worker.rb
@@ -20,6 +20,26 @@ module ActionCable
)
end
+ # Stop processing work: any work that has not already started
+ # running will be discarded from the queue
+ def halt
+ @pool.kill
+ end
+
+ def stopping?
+ @pool.shuttingdown?
+ end
+
+ def work(connection)
+ self.connection = connection
+
+ run_callbacks :work do
+ yield
+ end
+ ensure
+ self.connection = nil
+ end
+
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
@@ -27,19 +47,15 @@ module ActionCable
end
def invoke(receiver, method, *args)
- begin
- self.connection = receiver
-
- run_callbacks :work do
+ work(receiver) do
+ begin
receiver.send method, *args
- end
- rescue Exception => e
- logger.error "There was an exception - #{e.class}(#{e.message})"
- logger.error e.backtrace.join("\n")
+ rescue Exception => e
+ logger.error "There was an exception - #{e.class}(#{e.message})"
+ logger.error e.backtrace.join("\n")
- receiver.handle_exception if receiver.respond_to?(:handle_exception)
- ensure
- self.connection = nil
+ receiver.handle_exception if receiver.respond_to?(:handle_exception)
+ end
end
end
@@ -50,14 +66,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
- begin
- self.connection = channel.connection
-
- run_callbacks :work do
- callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
- end
- ensure
- self.connection = nil
+ work(channel.connection) do
+ callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
index 1ac8934410..c1e4aa8103 100644
--- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
+++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -1,7 +1,6 @@
module ActionCable
module Server
class Worker
- # Clear active connections between units of work so that way long-running channels or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
@@ -13,8 +12,6 @@ module ActionCable
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
- ensure
- ActiveRecord::Base.clear_active_connections!
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index cca6894289..10b3ac8cd8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -5,16 +5,21 @@ module ActionCable
class Async < Inline # :nodoc:
private
def new_subscriber_map
- AsyncSubscriberMap.new
+ AsyncSubscriberMap.new(server.event_loop)
end
class AsyncSubscriberMap < SubscriberMap
+ def initialize(event_loop)
+ @event_loop = event_loop
+ super()
+ end
+
def add_subscriber(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
index e6c862959b..256876cf30 100644
--- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -51,7 +51,7 @@ module ActionCable
@redis_connection_for_subscriptions || @server.mutex.synchronize do
@redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ @logger.error "[ActionCable] Redis reconnect failed."
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index abaeb92e54..66c7852f6e 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -42,14 +42,15 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
@@ -68,7 +69,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- Concurrent.global_io_executor << callback if callback
+ @event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -98,7 +99,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index ba4934a264..65434f7107 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -33,25 +33,30 @@ module ActionCable
end
def redis_connection_for_subscriptions
- ::Redis.new(@server.config.cable)
+ redis_connection
end
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
+ @redis_connection_for_broadcasts ||= redis_connection
end
end
+ def redis_connection
+ self.class.redis_connector.call(@server.config.cable)
+ end
+
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@@ -80,7 +85,7 @@ module ActionCable
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
- Concurrent.global_io_executor << next_callback if next_callback
+ @event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@@ -129,7 +134,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
private