aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/action_cable.rb42
-rw-r--r--lib/action_cable/channel.rb16
-rw-r--r--lib/action_cable/channel/base.rb84
-rw-r--r--lib/action_cable/channel/callbacks.rb37
-rw-r--r--lib/action_cable/channel/periodic_timers.rb6
-rw-r--r--lib/action_cable/channel/streams.rb14
-rw-r--r--lib/action_cable/connection.rb21
-rw-r--r--lib/action_cable/connection/base.rb67
-rw-r--r--lib/action_cable/connection/heartbeat.rb30
-rw-r--r--lib/action_cable/connection/identification.rb16
-rw-r--r--lib/action_cable/connection/internal_channel.rb4
-rw-r--r--lib/action_cable/connection/message_buffer.rb4
-rw-r--r--lib/action_cable/connection/subscriptions.rb10
-rw-r--r--lib/action_cable/connection/tagged_logger_proxy.rb13
-rw-r--r--lib/action_cable/connection/web_socket.rb2
-rw-r--r--lib/action_cable/engine.rb18
-rw-r--r--lib/action_cable/process/logging.rb4
-rw-r--r--lib/action_cable/server.rb20
-rw-r--r--lib/action_cable/server/base.rb5
-rw-r--r--lib/action_cable/server/broadcasting.rb6
-rw-r--r--lib/action_cable/server/configuration.rb5
-rw-r--r--lib/action_cable/server/connections.rb15
-rw-r--r--lib/action_cable/server/worker.rb12
-rw-r--r--lib/action_cable/server/worker/active_record_connection_management.rb (renamed from lib/action_cable/server/worker/clear_database_connections.rb)4
-rw-r--r--lib/assets/javascripts/cable.coffee.erb (renamed from lib/assets/javascripts/cable.js.coffee)2
-rw-r--r--lib/assets/javascripts/cable/connection.coffee84
-rw-r--r--lib/assets/javascripts/cable/connection.js.coffee77
-rw-r--r--lib/assets/javascripts/cable/connection_monitor.coffee (renamed from lib/assets/javascripts/cable/connection_monitor.js.coffee)42
-rw-r--r--lib/assets/javascripts/cable/consumer.coffee (renamed from lib/assets/javascripts/cable/consumer.js.coffee)0
-rw-r--r--lib/assets/javascripts/cable/subscription.coffee (renamed from lib/assets/javascripts/cable/subscription.js.coffee)0
-rw-r--r--lib/assets/javascripts/cable/subscriptions.coffee (renamed from lib/assets/javascripts/cable/subscriptions.js.coffee)38
31 files changed, 445 insertions, 253 deletions
diff --git a/lib/action_cable.rb b/lib/action_cable.rb
index 13c5c77578..3919812161 100644
--- a/lib/action_cable.rb
+++ b/lib/action_cable.rb
@@ -1,33 +1,31 @@
-require 'eventmachine'
-EM.epoll
-
-require 'set'
-
require 'active_support'
-require 'active_support/json'
-require 'active_support/concern'
-require 'active_support/core_ext/hash/indifferent_access'
-require 'active_support/core_ext/module/delegation'
-require 'active_support/callbacks'
-
-require 'faye/websocket'
-require 'celluloid'
-require 'em-hiredis'
-require 'redis'
-
-require 'action_cable/engine' if defined?(Rails)
+require 'active_support/rails'
require 'action_cable/version'
module ActionCable
- autoload :Server, 'action_cable/server'
- autoload :Connection, 'action_cable/connection'
- autoload :Channel, 'action_cable/channel'
+ extend ActiveSupport::Autoload
- autoload :RemoteConnections, 'action_cable/remote_connections'
- autoload :Broadcaster, 'action_cable/broadcaster'
+ INTERNAL = {
+ identifiers: {
+ ping: '_ping'.freeze
+ },
+ message_types: {
+ confirmation: 'confirm_subscription'.freeze,
+ rejection: 'reject_subscription'.freeze
+ }
+ }
# Singleton instance of the server
module_function def server
@server ||= ActionCable::Server::Base.new
end
+
+ eager_autoload do
+ autoload :Server
+ autoload :Connection
+ autoload :Channel
+ autoload :RemoteConnections
+ end
end
+
+require 'action_cable/engine' if defined?(Rails)
diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb
index 3b973ba0a7..7ae262ce5f 100644
--- a/lib/action_cable/channel.rb
+++ b/lib/action_cable/channel.rb
@@ -1,10 +1,14 @@
module ActionCable
module Channel
- autoload :Base, 'action_cable/channel/base'
- autoload :Broadcasting, 'action_cable/channel/broadcasting'
- autoload :Callbacks, 'action_cable/channel/callbacks'
- autoload :Naming, 'action_cable/channel/naming'
- autoload :PeriodicTimers, 'action_cable/channel/periodic_timers'
- autoload :Streams, 'action_cable/channel/streams'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Callbacks
+ autoload :Naming
+ autoload :PeriodicTimers
+ autoload :Streams
+ end
end
end
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb
index 2f1b4a187d..ca903a810d 100644
--- a/lib/action_cable/channel/base.rb
+++ b/lib/action_cable/channel/base.rb
@@ -1,6 +1,8 @@
+require 'set'
+
module ActionCable
module Channel
- # The channel provides the basic structure of grouping behavior into logical units when communicating over the websocket connection.
+ # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
# You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
# responding to the subscriber's direct requests.
#
@@ -64,6 +66,22 @@ module ActionCable
#
# Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
# All such identifiers will automatically create a delegation method of the same name on the channel instance.
+ #
+ # == Rejecting subscription requests
+ #
+ # A channel can reject a subscription request in the #subscribed callback by invoking #reject!
+ #
+ # Example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # reject unless current_user.can_access?(@room)
+ # end
+ # end
+ #
+ # In this example, the subscription will be rejected if the current_user does not have access to the chat room.
+ # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
@@ -71,10 +89,7 @@ module ActionCable
include Naming
include Broadcasting
- on_subscribe :subscribed
- on_unsubscribe :unsubscribed
-
- attr_reader :params, :connection
+ attr_reader :params, :connection, :identifier
delegate :logger, to: :connection
class << self
@@ -118,6 +133,10 @@ module ActionCable
@identifier = identifier
@params = params
+ # When a channel is streaming via redis pubsub, we want to delay the confirmation
+ # transmission until redis pubsub subscription is confirmed.
+ @defer_subscription_confirmation = false
+
delegate_connection_identifiers
subscribe_to_channel
end
@@ -138,8 +157,9 @@ module ActionCable
# Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel
- run_unsubscribe_callbacks
- logger.info "#{self.class.name} unsubscribed"
+ run_callbacks :unsubscribe do
+ unsubscribed
+ end
end
@@ -160,9 +180,28 @@ module ActionCable
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
- connection.transmit({ identifier: @identifier, message: data }.to_json)
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
end
+ def defer_subscription_confirmation!
+ @defer_subscription_confirmation = true
+ end
+
+ def defer_subscription_confirmation?
+ @defer_subscription_confirmation
+ end
+
+ def subscription_confirmation_sent?
+ @subscription_confirmation_sent
+ end
+
+ def reject
+ @reject_subscription = true
+ end
+
+ def subscription_rejected?
+ @reject_subscription
+ end
private
def delegate_connection_identifiers
@@ -175,8 +214,15 @@ module ActionCable
def subscribe_to_channel
- logger.info "#{self.class.name} subscribing"
- run_subscribe_callbacks
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ if subscription_rejected?
+ reject_subscription
+ else
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
end
@@ -206,12 +252,22 @@ module ActionCable
end
end
- def run_subscribe_callbacks
- self.class.on_subscribe_callbacks.each { |callback| send(callback) }
+ def transmit_subscription_confirmation
+ unless subscription_confirmation_sent?
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ @subscription_confirmation_sent = true
+ end
+ end
+
+ def reject_subscription
+ connection.subscriptions.remove_subscription self
+ transmit_subscription_rejection
end
- def run_unsubscribe_callbacks
- self.class.on_unsubscribe_callbacks.each { |callback| send(callback) }
+ def transmit_subscription_rejection
+ logger.info "#{self.class.name} is transmitting the subscription rejection"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
end
end
end
diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb
index dcdd27b9a7..295d750e86 100644
--- a/lib/action_cable/channel/callbacks.rb
+++ b/lib/action_cable/channel/callbacks.rb
@@ -1,28 +1,35 @@
+require 'active_support/callbacks'
+
module ActionCable
module Channel
module Callbacks
- extend ActiveSupport::Concern
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
included do
- class_attribute :on_subscribe_callbacks, :on_unsubscribe_callbacks, instance_reader: false
-
- self.on_subscribe_callbacks = []
- self.on_unsubscribe_callbacks = []
+ define_callbacks :subscribe
+ define_callbacks :unsubscribe
end
- module ClassMethods
- # Name methods that should be called when the channel is subscribed to.
- # (These methods should be private, so they're not callable by the user).
- def on_subscribe(*methods)
- self.on_subscribe_callbacks += methods
+ class_methods do
+ def before_subscribe(*methods, &block)
+ set_callback(:subscribe, :before, *methods, &block)
+ end
+
+ def after_subscribe(*methods, &block)
+ set_callback(:subscribe, :after, *methods, &block)
+ end
+ alias_method :on_subscribe, :after_subscribe
+
+ def before_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :before, *methods, &block)
end
- # Name methods that should be called when the channel is unsubscribed from.
- # (These methods should be private, so they're not callable by the user).
- def on_unsubscribe(*methods)
- self.on_unsubscribe_callbacks += methods
+ def after_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :after, *methods, &block)
end
+ alias_method :on_unsubscribe, :after_unsubscribe
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/channel/periodic_timers.rb b/lib/action_cable/channel/periodic_timers.rb
index 9bdcc87aa5..25fe8e5e54 100644
--- a/lib/action_cable/channel/periodic_timers.rb
+++ b/lib/action_cable/channel/periodic_timers.rb
@@ -7,8 +7,8 @@ module ActionCable
class_attribute :periodic_timers, instance_reader: false
self.periodic_timers = []
- on_subscribe :start_periodic_timers
- on_unsubscribe :stop_periodic_timers
+ after_subscribe :start_periodic_timers
+ after_unsubscribe :stop_periodic_timers
end
module ClassMethods
@@ -38,4 +38,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
index a37194b884..b5ffa17f72 100644
--- a/lib/action_cable/channel/streams.rb
+++ b/lib/action_cable/channel/streams.rb
@@ -69,12 +69,18 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
- callback ||= default_stream_callback(broadcasting)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+ callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- pubsub.subscribe broadcasting, &callback
- logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ EM.next_tick do
+ pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end
+ end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
@@ -88,7 +94,7 @@ module ActionCable
streams.each do |broadcasting, callback|
pubsub.unsubscribe_proc broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
- end
+ end.clear
end
private
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index c63621c519..b672e00682 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -1,13 +1,16 @@
module ActionCable
module Connection
- autoload :Authorization, 'action_cable/connection/authorization'
- autoload :Base, 'action_cable/connection/base'
- autoload :Heartbeat, 'action_cable/connection/heartbeat'
- autoload :Identification, 'action_cable/connection/identification'
- autoload :InternalChannel, 'action_cable/connection/internal_channel'
- autoload :MessageBuffer, 'action_cable/connection/message_buffer'
- autoload :WebSocket, 'action_cable/connection/web_socket'
- autoload :Subscriptions, 'action_cable/connection/subscriptions'
- autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Authorization
+ autoload :Base
+ autoload :Identification
+ autoload :InternalChannel
+ autoload :MessageBuffer
+ autoload :WebSocket
+ autoload :Subscriptions
+ autoload :TaggedLoggerProxy
+ end
end
end
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 08a75156a3..b93b6a8a50 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -1,8 +1,8 @@
-require 'action_dispatch/http/request'
+require 'action_dispatch'
module ActionCable
module Connection
- # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
# of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
@@ -37,8 +37,8 @@ module ActionCable
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
# identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
#
- # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes
- # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection.
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
#
@@ -48,7 +48,7 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env
+ attr_reader :server, :env, :subscriptions
delegate :worker_pool, :pubsub, to: :server
attr_reader :logger
@@ -59,19 +59,18 @@ module ActionCable
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
- @heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@started_at = Time.now
end
- # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
- if websocket.possible?
+ if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
@@ -88,19 +87,18 @@ module ActionCable
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
- logger.error "Received data without a live websocket (#{data.inspect})"
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
- # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
- # Close the websocket connection.
+ # Close the WebSocket connection.
def close
- logger.error "Closing connection"
websocket.close
end
@@ -112,12 +110,21 @@ module ActionCable
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
- { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
protected
- # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc.
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
@@ -125,7 +132,7 @@ module ActionCable
end
end
- # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks.
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies
request.cookie_jar
end
@@ -133,19 +140,17 @@ module ActionCable
private
attr_reader :websocket
- attr_reader :heartbeat, :subscriptions, :message_buffer
+ attr_reader :message_buffer
def on_open
- server.add_connection(self)
-
connect if respond_to?(:connect)
subscribe_to_internal_channel
- heartbeat.start
+ beat
message_buffer.process!
+ server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
- close
end
def on_message(message)
@@ -159,17 +164,29 @@ module ActionCable
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
- heartbeat.stop
disconnect if respond_to?(:disconnect)
end
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN']
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
def respond_to_successful_request
websocket.rack_response
end
def respond_to_invalid_request
+ close if websocket.alive?
+
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
@@ -185,17 +202,17 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
end
end
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
deleted file mode 100644
index 2918938ba5..0000000000
--- a/lib/action_cable/connection/heartbeat.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-module ActionCable
- module Connection
- # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
- # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
- # disconnect.
- class Heartbeat
- BEAT_INTERVAL = 3
-
- def initialize(connection)
- @connection = connection
- end
-
- def start
- beat
- @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
- end
-
- def stop
- EventMachine.cancel_timer(@timer) if @timer
- end
-
- private
- attr_reader :connection
-
- def beat
- connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
- end
- end
-end
diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb
index 1be6f9ac76..2d75ff8d6d 100644
--- a/lib/action_cable/connection/identification.rb
+++ b/lib/action_cable/connection/identification.rb
@@ -1,3 +1,5 @@
+require 'set'
+
module ActionCable
module Connection
module Identification
@@ -22,12 +24,22 @@ module ActionCable
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
def connection_identifier
- @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
end
private
def connection_gid(ids)
- ids.map { |o| o.to_global_id.to_s }.sort.join(":")
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
end
end
end
diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb
index b00e21824c..c065a24ab7 100644
--- a/lib/action_cable/connection/internal_channel.rb
+++ b/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
- pubsub.subscribe(internal_redis_channel, &callback)
+ EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) }
+ @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb
index d5a8e9eba9..25cff75b41 100644
--- a/lib/action_cable/connection/message_buffer.rb
+++ b/lib/action_cable/connection/message_buffer.rb
@@ -1,6 +1,6 @@
module ActionCable
module Connection
- # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them.
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@@ -50,4 +50,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb
index 69e3f60706..6199db4898 100644
--- a/lib/action_cable/connection/subscriptions.rb
+++ b/lib/action_cable/connection/subscriptions.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
@@ -35,8 +37,12 @@ module ActionCable
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
- subscriptions[data['identifier']].unsubscribe_from_channel
- subscriptions.delete(data['identifier'])
+ remove_subscription subscriptions[data['identifier']]
+ end
+
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
end
def perform_action(data)
diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb
index 854f613f1c..e5319087fb 100644
--- a/lib/action_cable/connection/tagged_logger_proxy.rb
+++ b/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -4,6 +4,8 @@ module ActionCable
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
+ attr_reader :tags
+
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
@@ -14,6 +16,15 @@ module ActionCable
@tags = @tags.uniq
end
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+
%i( debug info warn error fatal unknown ).each do |severity|
define_method(severity) do |message|
log severity, message
@@ -22,7 +33,7 @@ module ActionCable
protected
def log(type, message)
- @logger.tagged(*@tags) { @logger.send type, message }
+ tag(@logger) { @logger.send type, message }
end
end
end
diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb
index 135a28cfe4..169b683b8c 100644
--- a/lib/action_cable/connection/web_socket.rb
+++ b/lib/action_cable/connection/web_socket.rb
@@ -1,3 +1,5 @@
+require 'faye/websocket'
+
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb
index 6c943c7971..613a9b99f2 100644
--- a/lib/action_cable/engine.rb
+++ b/lib/action_cable/engine.rb
@@ -1,4 +1,22 @@
+require 'rails/engine'
+require 'active_support/ordered_options'
+
module ActionCable
class Engine < ::Rails::Engine
+ config.action_cable = ActiveSupport::OrderedOptions.new
+
+ initializer "action_cable.logger" do
+ ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
+ end
+
+ initializer "action_cable.set_configs" do |app|
+ options = app.config.action_cable
+
+ options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
+
+ ActiveSupport.on_load(:action_cable) do
+ options.each { |k,v| send("#{k}=", v) }
+ end
+ end
end
end
diff --git a/lib/action_cable/process/logging.rb b/lib/action_cable/process/logging.rb
index 827a58fdb8..618ba7357a 100644
--- a/lib/action_cable/process/logging.rb
+++ b/lib/action_cable/process/logging.rb
@@ -1,3 +1,7 @@
+require 'action_cable/server'
+require 'eventmachine'
+require 'celluloid'
+
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index 919ebd94de..a2a89d5f1e 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,11 +1,19 @@
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module Server
- autoload :Base, 'action_cable/server/base'
- autoload :Broadcasting, 'action_cable/server/broadcasting'
- autoload :Connections, 'action_cable/server/connections'
- autoload :Configuration, 'action_cable/server/configuration'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Connections
+ autoload :Configuration
- autoload :Worker, 'action_cable/server/worker'
- autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections'
+ autoload :Worker
+ autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
+ end
end
end
diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb
index 43849928b9..f1585dc776 100644
--- a/lib/action_cable/server/base.rb
+++ b/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'em-hiredis'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -18,6 +20,7 @@ module ActionCable
# Called by rack to setup the server.
def call(env)
+ setup_heartbeat_timer
config.connection_class.new(self, env).process
end
@@ -65,5 +68,7 @@ module ActionCable
config.connection_class.identifiers
end
end
+
+ ActiveSupport.run_load_hooks(:action_cable, Base.config)
end
end
diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb
index 037b98951e..6e0fbae387 100644
--- a/lib/action_cable/server/broadcasting.rb
+++ b/lib/action_cable/server/broadcasting.rb
@@ -1,3 +1,5 @@
+require 'redis'
+
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
@@ -44,9 +46,9 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.broadcasting_redis.publish broadcasting, message.to_json
+ server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb
index 4808d170ff..89a0caddb4 100644
--- a/lib/action_cable/server/configuration.rb
+++ b/lib/action_cable/server/configuration.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActionCable
module Server
# An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
@@ -6,6 +8,7 @@ module ActionCable
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :redis_path, :channels_path
+ attr_accessor :disable_request_forgery_protection, :allowed_request_origins
def initialize
@logger = Rails.logger
@@ -16,6 +19,8 @@ module ActionCable
@redis_path = Rails.root.join('config/redis/cable.yml')
@channels_path = Rails.root.join('app/channels')
+
+ @disable_request_forgery_protection = false
end
def log_to_stdout
diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb
index 15d7c3c8c7..47dcea8c20 100644
--- a/lib/action_cable/server/connections.rb
+++ b/lib/action_cable/server/connections.rb
@@ -4,6 +4,8 @@ module ActionCable
# you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
# As such, this is primarily for internal use.
module Connections
+ BEAT_INTERVAL = 3
+
def connections
@connections ||= []
end
@@ -16,9 +18,20 @@ module ActionCable
connections.delete connection
end
+ # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
+ # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # disconnect.
+ def setup_heartbeat_timer
+ EM.next_tick do
+ @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
+ EM.next_tick { connections.map(&:beat) }
+ end
+ end
+ end
+
def open_connections_statistics
connections.map(&:statistics)
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb
index d7823ecf93..e063b2a2e1 100644
--- a/lib/action_cable/server/worker.rb
+++ b/lib/action_cable/server/worker.rb
@@ -1,3 +1,6 @@
+require 'celluloid'
+require 'active_support/callbacks'
+
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
@@ -5,10 +8,13 @@ module ActionCable
include ActiveSupport::Callbacks
include Celluloid
+ attr_reader :connection
define_callbacks :work
- include ClearDatabaseConnections
+ include ActiveRecordConnectionManagement
def invoke(receiver, method, *args)
+ @connection = receiver
+
run_callbacks :work do
receiver.send method, *args
end
@@ -20,6 +26,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
+ @connection = channel.connection
+
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
@@ -31,4 +39,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb
index 722d363a41..ecece4e270 100644
--- a/lib/action_cable/server/worker/clear_database_connections.rb
+++ b/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -2,7 +2,7 @@ module ActionCable
module Server
class Worker
# Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
- module ClearDatabaseConnections
+ module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
included do
@@ -12,7 +12,7 @@ module ActionCable
end
def with_database_connections
- yield
+ connection.logger.tag(ActiveRecord::Base.logger) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end
diff --git a/lib/assets/javascripts/cable.js.coffee b/lib/assets/javascripts/cable.coffee.erb
index 0bd1757505..8498233c11 100644
--- a/lib/assets/javascripts/cable.js.coffee
+++ b/lib/assets/javascripts/cable.coffee.erb
@@ -2,7 +2,7 @@
#= require cable/consumer
@Cable =
- PING_IDENTIFIER: "_ping"
+ INTERNAL: <%= ActionCable::INTERNAL.to_json %>
createConsumer: (url) ->
new Cable.Consumer url
diff --git a/lib/assets/javascripts/cable/connection.coffee b/lib/assets/javascripts/cable/connection.coffee
new file mode 100644
index 0000000000..b2abe8dcb2
--- /dev/null
+++ b/lib/assets/javascripts/cable/connection.coffee
@@ -0,0 +1,84 @@
+# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
+
+{message_types} = Cable.INTERNAL
+
+class Cable.Connection
+ @reopenDelay: 500
+
+ constructor: (@consumer) ->
+ @open()
+
+ send: (data) ->
+ if @isOpen()
+ @webSocket.send(JSON.stringify(data))
+ true
+ else
+ false
+
+ open: =>
+ if @webSocket and not @isState("closed")
+ throw new Error("Existing connection must be closed before opening")
+ else
+ @webSocket = new WebSocket(@consumer.url)
+ @installEventHandlers()
+ true
+
+ close: ->
+ @webSocket?.close()
+
+ reopen: ->
+ if @isState("closed")
+ @open()
+ else
+ try
+ @close()
+ finally
+ setTimeout(@open, @constructor.reopenDelay)
+
+ isOpen: ->
+ @isState("open")
+
+ # Private
+
+ isState: (states...) ->
+ @getState() in states
+
+ getState: ->
+ return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState
+ null
+
+ installEventHandlers: ->
+ for eventName of @events
+ handler = @events[eventName].bind(this)
+ @webSocket["on#{eventName}"] = handler
+ return
+
+ events:
+ message: (event) ->
+ {identifier, message, type} = JSON.parse(event.data)
+
+ switch type
+ when message_types.confirmation
+ @consumer.subscriptions.notify(identifier, "connected")
+ when message_types.rejection
+ @consumer.subscriptions.reject(identifier)
+ else
+ @consumer.subscriptions.notify(identifier, "received", message)
+
+ open: ->
+ @disconnected = false
+ @consumer.subscriptions.reload()
+
+ close: ->
+ @disconnect()
+
+ error: ->
+ @disconnect()
+
+ disconnect: ->
+ return if @disconnected
+ @disconnected = true
+ @consumer.subscriptions.notifyAll("disconnected")
+
+ toJSON: ->
+ state: @getState()
diff --git a/lib/assets/javascripts/cable/connection.js.coffee b/lib/assets/javascripts/cable/connection.js.coffee
deleted file mode 100644
index 464f0c1ff7..0000000000
--- a/lib/assets/javascripts/cable/connection.js.coffee
+++ /dev/null
@@ -1,77 +0,0 @@
-# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
-class Cable.Connection
- constructor: (@consumer) ->
- @open()
-
- send: (data) ->
- if @isOpen()
- @webSocket.send(JSON.stringify(data))
- true
- else
- false
-
- open: ->
- return if @isState("open", "connecting")
- @webSocket = new WebSocket(@consumer.url)
- @installEventHandlers()
-
- close: ->
- return if @isState("closed", "closing")
- @webSocket?.close()
-
- reopen: ->
- if @isOpen()
- @closeSilently => @open()
- else
- @open()
-
- isOpen: ->
- @isState("open")
-
- # Private
-
- isState: (states...) ->
- @getState() in states
-
- getState: ->
- return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState
- null
-
- closeSilently: (callback = ->) ->
- @uninstallEventHandlers()
- @installEventHandler("close", callback)
- @installEventHandler("error", callback)
- try
- @webSocket.close()
- finally
- @uninstallEventHandlers()
-
- installEventHandlers: ->
- for eventName of @events
- @installEventHandler(eventName)
-
- installEventHandler: (eventName, handler) ->
- handler ?= @events[eventName].bind(this)
- @webSocket.addEventListener(eventName, handler)
-
- uninstallEventHandlers: ->
- for eventName of @events
- @webSocket.removeEventListener(eventName)
-
- events:
- message: (event) ->
- {identifier, message} = JSON.parse(event.data)
- @consumer.subscriptions.notify(identifier, "received", message)
-
- open: ->
- @consumer.subscriptions.reload()
-
- close: ->
- @consumer.subscriptions.notifyAll("disconnected")
-
- error: ->
- @consumer.subscriptions.notifyAll("disconnected")
- @closeSilently()
-
- toJSON: ->
- state: @getState()
diff --git a/lib/assets/javascripts/cable/connection_monitor.js.coffee b/lib/assets/javascripts/cable/connection_monitor.coffee
index cac65d9043..435efcc361 100644
--- a/lib/assets/javascripts/cable/connection_monitor.js.coffee
+++ b/lib/assets/javascripts/cable/connection_monitor.coffee
@@ -1,15 +1,13 @@
# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting
# revival reconnections if things go astray. Internal class, not intended for direct user manipulation.
class Cable.ConnectionMonitor
- identifier: Cable.PING_IDENTIFIER
-
- pollInterval:
- min: 2
+ @pollInterval:
+ min: 3
max: 30
- staleThreshold:
- startedAt: 4
- pingedAt: 8
+ @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings)
+
+ identifier: Cable.INTERNAL.identifiers.ping
constructor: (@consumer) ->
@consumer.subscriptions.add(this)
@@ -18,6 +16,10 @@ class Cable.ConnectionMonitor
connected: ->
@reset()
@pingedAt = now()
+ delete @disconnectedAt
+
+ disconnected: ->
+ @disconnectedAt = now()
received: ->
@pingedAt = now()
@@ -30,9 +32,11 @@ class Cable.ConnectionMonitor
delete @stoppedAt
@startedAt = now()
@poll()
+ document.addEventListener("visibilitychange", @visibilityDidChange)
stop: ->
@stoppedAt = now()
+ document.removeEventListener("visibilitychange", @visibilityDidChange)
poll: ->
setTimeout =>
@@ -42,20 +46,28 @@ class Cable.ConnectionMonitor
, @getInterval()
getInterval: ->
- {min, max} = @pollInterval
- interval = 4 * Math.log(@reconnectAttempts + 1)
+ {min, max} = @constructor.pollInterval
+ interval = 5 * Math.log(@reconnectAttempts + 1)
clamp(interval, min, max) * 1000
reconnectIfStale: ->
if @connectionIsStale()
- @reconnectAttempts += 1
- @consumer.connection.reopen()
+ @reconnectAttempts++
+ unless @disconnectedRecently()
+ @consumer.connection.reopen()
connectionIsStale: ->
- if @pingedAt
- secondsSince(@pingedAt) > @staleThreshold.pingedAt
- else
- secondsSince(@startedAt) > @staleThreshold.startedAt
+ secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold
+
+ disconnectedRecently: ->
+ @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold
+
+ visibilityDidChange: =>
+ if document.visibilityState is "visible"
+ setTimeout =>
+ if @connectionIsStale() or not @consumer.connection.isOpen()
+ @consumer.connection.reopen()
+ , 200
toJSON: ->
interval = @getInterval()
diff --git a/lib/assets/javascripts/cable/consumer.js.coffee b/lib/assets/javascripts/cable/consumer.coffee
index 05a7398e79..05a7398e79 100644
--- a/lib/assets/javascripts/cable/consumer.js.coffee
+++ b/lib/assets/javascripts/cable/consumer.coffee
diff --git a/lib/assets/javascripts/cable/subscription.js.coffee b/lib/assets/javascripts/cable/subscription.coffee
index 5b024d4e15..5b024d4e15 100644
--- a/lib/assets/javascripts/cable/subscription.js.coffee
+++ b/lib/assets/javascripts/cable/subscription.coffee
diff --git a/lib/assets/javascripts/cable/subscriptions.js.coffee b/lib/assets/javascripts/cable/subscriptions.coffee
index fe6975c870..7955565f06 100644
--- a/lib/assets/javascripts/cable/subscriptions.js.coffee
+++ b/lib/assets/javascripts/cable/subscriptions.coffee
@@ -9,6 +9,7 @@
class Cable.Subscriptions
constructor: (@consumer) ->
@subscriptions = []
+ @history = []
create: (channelName, mixin) ->
channel = channelName
@@ -20,22 +21,29 @@ class Cable.Subscriptions
add: (subscription) ->
@subscriptions.push(subscription)
@notify(subscription, "initialized")
- if @sendCommand(subscription, "subscribe")
- @notify(subscription, "connected")
-
- reload: ->
- for subscription in @subscriptions
- if @sendCommand(subscription, "subscribe")
- @notify(subscription, "connected")
+ @sendCommand(subscription, "subscribe")
remove: (subscription) ->
- @subscriptions = (s for s in @subscriptions when s isnt subscription)
+ @forget(subscription)
+
unless @findAll(subscription.identifier).length
@sendCommand(subscription, "unsubscribe")
+ reject: (identifier) ->
+ for subscription in @findAll(identifier)
+ @forget(subscription)
+ @notify(subscription, "rejected")
+
+ forget: (subscription) ->
+ @subscriptions = (s for s in @subscriptions when s isnt subscription)
+
findAll: (identifier) ->
s for s in @subscriptions when s.identifier is identifier
+ reload: ->
+ for subscription in @subscriptions
+ @sendCommand(subscription, "subscribe")
+
notifyAll: (callbackName, args...) ->
for subscription in @subscriptions
@notify(subscription, callbackName, args...)
@@ -49,12 +57,22 @@ class Cable.Subscriptions
for subscription in subscriptions
subscription[callbackName]?(args...)
+ if callbackName in ["initialized", "connected", "disconnected", "rejected"]
+ {identifier} = subscription
+ @record(notification: {identifier, callbackName, args})
+
sendCommand: (subscription, command) ->
{identifier} = subscription
- if identifier is Cable.PING_IDENTIFIER
+ if identifier is Cable.INTERNAL.identifiers.ping
@consumer.connection.isOpen()
else
@consumer.send({command, identifier})
+ record: (data) ->
+ data.time = new Date()
+ @history = @history.slice(-19)
+ @history.push(data)
+
toJSON: ->
- subscription.identifier for subscription in @subscriptions
+ history: @history
+ identifiers: (subscription.identifier for subscription in @subscriptions)