aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'lib/action_cable')
-rw-r--r--lib/action_cable/channel.rb16
-rw-r--r--lib/action_cable/channel/base.rb84
-rw-r--r--lib/action_cable/channel/callbacks.rb37
-rw-r--r--lib/action_cable/channel/periodic_timers.rb6
-rw-r--r--lib/action_cable/channel/streams.rb14
-rw-r--r--lib/action_cable/connection.rb21
-rw-r--r--lib/action_cable/connection/base.rb67
-rw-r--r--lib/action_cable/connection/heartbeat.rb30
-rw-r--r--lib/action_cable/connection/identification.rb16
-rw-r--r--lib/action_cable/connection/internal_channel.rb4
-rw-r--r--lib/action_cable/connection/message_buffer.rb4
-rw-r--r--lib/action_cable/connection/subscriptions.rb10
-rw-r--r--lib/action_cable/connection/tagged_logger_proxy.rb13
-rw-r--r--lib/action_cable/connection/web_socket.rb2
-rw-r--r--lib/action_cable/engine.rb18
-rw-r--r--lib/action_cable/process/logging.rb4
-rw-r--r--lib/action_cable/server.rb20
-rw-r--r--lib/action_cable/server/base.rb5
-rw-r--r--lib/action_cable/server/broadcasting.rb6
-rw-r--r--lib/action_cable/server/configuration.rb5
-rw-r--r--lib/action_cable/server/connections.rb15
-rw-r--r--lib/action_cable/server/worker.rb12
-rw-r--r--lib/action_cable/server/worker/active_record_connection_management.rb (renamed from lib/action_cable/server/worker/clear_database_connections.rb)4
23 files changed, 285 insertions, 128 deletions
diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb
index 3b973ba0a7..7ae262ce5f 100644
--- a/lib/action_cable/channel.rb
+++ b/lib/action_cable/channel.rb
@@ -1,10 +1,14 @@
module ActionCable
module Channel
- autoload :Base, 'action_cable/channel/base'
- autoload :Broadcasting, 'action_cable/channel/broadcasting'
- autoload :Callbacks, 'action_cable/channel/callbacks'
- autoload :Naming, 'action_cable/channel/naming'
- autoload :PeriodicTimers, 'action_cable/channel/periodic_timers'
- autoload :Streams, 'action_cable/channel/streams'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Callbacks
+ autoload :Naming
+ autoload :PeriodicTimers
+ autoload :Streams
+ end
end
end
diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb
index 2f1b4a187d..ca903a810d 100644
--- a/lib/action_cable/channel/base.rb
+++ b/lib/action_cable/channel/base.rb
@@ -1,6 +1,8 @@
+require 'set'
+
module ActionCable
module Channel
- # The channel provides the basic structure of grouping behavior into logical units when communicating over the websocket connection.
+ # The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
# You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
# responding to the subscriber's direct requests.
#
@@ -64,6 +66,22 @@ module ActionCable
#
# Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
# All such identifiers will automatically create a delegation method of the same name on the channel instance.
+ #
+ # == Rejecting subscription requests
+ #
+ # A channel can reject a subscription request in the #subscribed callback by invoking #reject!
+ #
+ # Example:
+ #
+ # class ChatChannel < ApplicationCable::Channel
+ # def subscribed
+ # @room = Chat::Room[params[:room_number]]
+ # reject unless current_user.can_access?(@room)
+ # end
+ # end
+ #
+ # In this example, the subscription will be rejected if the current_user does not have access to the chat room.
+ # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
@@ -71,10 +89,7 @@ module ActionCable
include Naming
include Broadcasting
- on_subscribe :subscribed
- on_unsubscribe :unsubscribed
-
- attr_reader :params, :connection
+ attr_reader :params, :connection, :identifier
delegate :logger, to: :connection
class << self
@@ -118,6 +133,10 @@ module ActionCable
@identifier = identifier
@params = params
+ # When a channel is streaming via redis pubsub, we want to delay the confirmation
+ # transmission until redis pubsub subscription is confirmed.
+ @defer_subscription_confirmation = false
+
delegate_connection_identifiers
subscribe_to_channel
end
@@ -138,8 +157,9 @@ module ActionCable
# Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel
- run_unsubscribe_callbacks
- logger.info "#{self.class.name} unsubscribed"
+ run_callbacks :unsubscribe do
+ unsubscribed
+ end
end
@@ -160,9 +180,28 @@ module ActionCable
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
- connection.transmit({ identifier: @identifier, message: data }.to_json)
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
end
+ def defer_subscription_confirmation!
+ @defer_subscription_confirmation = true
+ end
+
+ def defer_subscription_confirmation?
+ @defer_subscription_confirmation
+ end
+
+ def subscription_confirmation_sent?
+ @subscription_confirmation_sent
+ end
+
+ def reject
+ @reject_subscription = true
+ end
+
+ def subscription_rejected?
+ @reject_subscription
+ end
private
def delegate_connection_identifiers
@@ -175,8 +214,15 @@ module ActionCable
def subscribe_to_channel
- logger.info "#{self.class.name} subscribing"
- run_subscribe_callbacks
+ run_callbacks :subscribe do
+ subscribed
+ end
+
+ if subscription_rejected?
+ reject_subscription
+ else
+ transmit_subscription_confirmation unless defer_subscription_confirmation?
+ end
end
@@ -206,12 +252,22 @@ module ActionCable
end
end
- def run_subscribe_callbacks
- self.class.on_subscribe_callbacks.each { |callback| send(callback) }
+ def transmit_subscription_confirmation
+ unless subscription_confirmation_sent?
+ logger.info "#{self.class.name} is transmitting the subscription confirmation"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
+ @subscription_confirmation_sent = true
+ end
+ end
+
+ def reject_subscription
+ connection.subscriptions.remove_subscription self
+ transmit_subscription_rejection
end
- def run_unsubscribe_callbacks
- self.class.on_unsubscribe_callbacks.each { |callback| send(callback) }
+ def transmit_subscription_rejection
+ logger.info "#{self.class.name} is transmitting the subscription rejection"
+ connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
end
end
end
diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb
index dcdd27b9a7..295d750e86 100644
--- a/lib/action_cable/channel/callbacks.rb
+++ b/lib/action_cable/channel/callbacks.rb
@@ -1,28 +1,35 @@
+require 'active_support/callbacks'
+
module ActionCable
module Channel
module Callbacks
- extend ActiveSupport::Concern
+ extend ActiveSupport::Concern
+ include ActiveSupport::Callbacks
included do
- class_attribute :on_subscribe_callbacks, :on_unsubscribe_callbacks, instance_reader: false
-
- self.on_subscribe_callbacks = []
- self.on_unsubscribe_callbacks = []
+ define_callbacks :subscribe
+ define_callbacks :unsubscribe
end
- module ClassMethods
- # Name methods that should be called when the channel is subscribed to.
- # (These methods should be private, so they're not callable by the user).
- def on_subscribe(*methods)
- self.on_subscribe_callbacks += methods
+ class_methods do
+ def before_subscribe(*methods, &block)
+ set_callback(:subscribe, :before, *methods, &block)
+ end
+
+ def after_subscribe(*methods, &block)
+ set_callback(:subscribe, :after, *methods, &block)
+ end
+ alias_method :on_subscribe, :after_subscribe
+
+ def before_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :before, *methods, &block)
end
- # Name methods that should be called when the channel is unsubscribed from.
- # (These methods should be private, so they're not callable by the user).
- def on_unsubscribe(*methods)
- self.on_unsubscribe_callbacks += methods
+ def after_unsubscribe(*methods, &block)
+ set_callback(:unsubscribe, :after, *methods, &block)
end
+ alias_method :on_unsubscribe, :after_unsubscribe
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/channel/periodic_timers.rb b/lib/action_cable/channel/periodic_timers.rb
index 9bdcc87aa5..25fe8e5e54 100644
--- a/lib/action_cable/channel/periodic_timers.rb
+++ b/lib/action_cable/channel/periodic_timers.rb
@@ -7,8 +7,8 @@ module ActionCable
class_attribute :periodic_timers, instance_reader: false
self.periodic_timers = []
- on_subscribe :start_periodic_timers
- on_unsubscribe :stop_periodic_timers
+ after_subscribe :start_periodic_timers
+ after_unsubscribe :stop_periodic_timers
end
module ClassMethods
@@ -38,4 +38,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/channel/streams.rb b/lib/action_cable/channel/streams.rb
index a37194b884..b5ffa17f72 100644
--- a/lib/action_cable/channel/streams.rb
+++ b/lib/action_cable/channel/streams.rb
@@ -69,12 +69,18 @@ module ActionCable
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
- callback ||= default_stream_callback(broadcasting)
+ # Hold off the confirmation until pubsub#subscribe is successful
+ defer_subscription_confirmation!
+ callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
- pubsub.subscribe broadcasting, &callback
- logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ EM.next_tick do
+ pubsub.subscribe(broadcasting, &callback).callback do |reply|
+ transmit_subscription_confirmation
+ logger.info "#{self.class.name} is streaming from #{broadcasting}"
+ end
+ end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
@@ -88,7 +94,7 @@ module ActionCable
streams.each do |broadcasting, callback|
pubsub.unsubscribe_proc broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
- end
+ end.clear
end
private
diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb
index c63621c519..b672e00682 100644
--- a/lib/action_cable/connection.rb
+++ b/lib/action_cable/connection.rb
@@ -1,13 +1,16 @@
module ActionCable
module Connection
- autoload :Authorization, 'action_cable/connection/authorization'
- autoload :Base, 'action_cable/connection/base'
- autoload :Heartbeat, 'action_cable/connection/heartbeat'
- autoload :Identification, 'action_cable/connection/identification'
- autoload :InternalChannel, 'action_cable/connection/internal_channel'
- autoload :MessageBuffer, 'action_cable/connection/message_buffer'
- autoload :WebSocket, 'action_cable/connection/web_socket'
- autoload :Subscriptions, 'action_cable/connection/subscriptions'
- autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Authorization
+ autoload :Base
+ autoload :Identification
+ autoload :InternalChannel
+ autoload :MessageBuffer
+ autoload :WebSocket
+ autoload :Subscriptions
+ autoload :TaggedLoggerProxy
+ end
end
end
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 08a75156a3..b93b6a8a50 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -1,8 +1,8 @@
-require 'action_dispatch/http/request'
+require 'action_dispatch'
module ActionCable
module Connection
- # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
# of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
@@ -37,8 +37,8 @@ module ActionCable
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
# identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
#
- # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes
- # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection.
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
#
@@ -48,7 +48,7 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env
+ attr_reader :server, :env, :subscriptions
delegate :worker_pool, :pubsub, to: :server
attr_reader :logger
@@ -59,19 +59,18 @@ module ActionCable
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
- @heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@started_at = Time.now
end
- # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
- if websocket.possible?
+ if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
@@ -88,19 +87,18 @@ module ActionCable
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
- logger.error "Received data without a live websocket (#{data.inspect})"
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
- # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
- # Close the websocket connection.
+ # Close the WebSocket connection.
def close
- logger.error "Closing connection"
websocket.close
end
@@ -112,12 +110,21 @@ module ActionCable
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
- { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
protected
- # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc.
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
@@ -125,7 +132,7 @@ module ActionCable
end
end
- # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks.
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies
request.cookie_jar
end
@@ -133,19 +140,17 @@ module ActionCable
private
attr_reader :websocket
- attr_reader :heartbeat, :subscriptions, :message_buffer
+ attr_reader :message_buffer
def on_open
- server.add_connection(self)
-
connect if respond_to?(:connect)
subscribe_to_internal_channel
- heartbeat.start
+ beat
message_buffer.process!
+ server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
- close
end
def on_message(message)
@@ -159,17 +164,29 @@ module ActionCable
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
- heartbeat.stop
disconnect if respond_to?(:disconnect)
end
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN']
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
def respond_to_successful_request
websocket.rack_response
end
def respond_to_invalid_request
+ close if websocket.alive?
+
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
@@ -185,17 +202,17 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
end
end
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
deleted file mode 100644
index 2918938ba5..0000000000
--- a/lib/action_cable/connection/heartbeat.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-module ActionCable
- module Connection
- # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
- # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
- # disconnect.
- class Heartbeat
- BEAT_INTERVAL = 3
-
- def initialize(connection)
- @connection = connection
- end
-
- def start
- beat
- @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
- end
-
- def stop
- EventMachine.cancel_timer(@timer) if @timer
- end
-
- private
- attr_reader :connection
-
- def beat
- connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
- end
- end
-end
diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb
index 1be6f9ac76..2d75ff8d6d 100644
--- a/lib/action_cable/connection/identification.rb
+++ b/lib/action_cable/connection/identification.rb
@@ -1,3 +1,5 @@
+require 'set'
+
module ActionCable
module Connection
module Identification
@@ -22,12 +24,22 @@ module ActionCable
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
def connection_identifier
- @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
end
private
def connection_gid(ids)
- ids.map { |o| o.to_global_id.to_s }.sort.join(":")
+ ids.map do |o|
+ if o.respond_to? :to_gid_param
+ o.to_gid_param
+ else
+ o.to_s
+ end
+ end.sort.join(":")
end
end
end
diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb
index b00e21824c..c065a24ab7 100644
--- a/lib/action_cable/connection/internal_channel.rb
+++ b/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
- pubsub.subscribe(internal_redis_channel, &callback)
+ EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) }
+ @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb
index d5a8e9eba9..25cff75b41 100644
--- a/lib/action_cable/connection/message_buffer.rb
+++ b/lib/action_cable/connection/message_buffer.rb
@@ -1,6 +1,6 @@
module ActionCable
module Connection
- # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them.
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@@ -50,4 +50,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb
index 69e3f60706..6199db4898 100644
--- a/lib/action_cable/connection/subscriptions.rb
+++ b/lib/action_cable/connection/subscriptions.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
@@ -35,8 +37,12 @@ module ActionCable
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
- subscriptions[data['identifier']].unsubscribe_from_channel
- subscriptions.delete(data['identifier'])
+ remove_subscription subscriptions[data['identifier']]
+ end
+
+ def remove_subscription(subscription)
+ subscription.unsubscribe_from_channel
+ subscriptions.delete(subscription.identifier)
end
def perform_action(data)
diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb
index 854f613f1c..e5319087fb 100644
--- a/lib/action_cable/connection/tagged_logger_proxy.rb
+++ b/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -4,6 +4,8 @@ module ActionCable
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
+ attr_reader :tags
+
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
@@ -14,6 +16,15 @@ module ActionCable
@tags = @tags.uniq
end
+ def tag(logger)
+ if logger.respond_to?(:tagged)
+ current_tags = tags - logger.formatter.current_tags
+ logger.tagged(*current_tags) { yield }
+ else
+ yield
+ end
+ end
+
%i( debug info warn error fatal unknown ).each do |severity|
define_method(severity) do |message|
log severity, message
@@ -22,7 +33,7 @@ module ActionCable
protected
def log(type, message)
- @logger.tagged(*@tags) { @logger.send type, message }
+ tag(@logger) { @logger.send type, message }
end
end
end
diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb
index 135a28cfe4..169b683b8c 100644
--- a/lib/action_cable/connection/web_socket.rb
+++ b/lib/action_cable/connection/web_socket.rb
@@ -1,3 +1,5 @@
+require 'faye/websocket'
+
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
diff --git a/lib/action_cable/engine.rb b/lib/action_cable/engine.rb
index 6c943c7971..613a9b99f2 100644
--- a/lib/action_cable/engine.rb
+++ b/lib/action_cable/engine.rb
@@ -1,4 +1,22 @@
+require 'rails/engine'
+require 'active_support/ordered_options'
+
module ActionCable
class Engine < ::Rails::Engine
+ config.action_cable = ActiveSupport::OrderedOptions.new
+
+ initializer "action_cable.logger" do
+ ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
+ end
+
+ initializer "action_cable.set_configs" do |app|
+ options = app.config.action_cable
+
+ options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
+
+ ActiveSupport.on_load(:action_cable) do
+ options.each { |k,v| send("#{k}=", v) }
+ end
+ end
end
end
diff --git a/lib/action_cable/process/logging.rb b/lib/action_cable/process/logging.rb
index 827a58fdb8..618ba7357a 100644
--- a/lib/action_cable/process/logging.rb
+++ b/lib/action_cable/process/logging.rb
@@ -1,3 +1,7 @@
+require 'action_cable/server'
+require 'eventmachine'
+require 'celluloid'
+
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb
index 919ebd94de..a2a89d5f1e 100644
--- a/lib/action_cable/server.rb
+++ b/lib/action_cable/server.rb
@@ -1,11 +1,19 @@
+require 'eventmachine'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module Server
- autoload :Base, 'action_cable/server/base'
- autoload :Broadcasting, 'action_cable/server/broadcasting'
- autoload :Connections, 'action_cable/server/connections'
- autoload :Configuration, 'action_cable/server/configuration'
+ extend ActiveSupport::Autoload
+
+ eager_autoload do
+ autoload :Base
+ autoload :Broadcasting
+ autoload :Connections
+ autoload :Configuration
- autoload :Worker, 'action_cable/server/worker'
- autoload :ClearDatabaseConnections, 'action_cable/server/worker/clear_database_connections'
+ autoload :Worker
+ autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
+ end
end
end
diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb
index 43849928b9..f1585dc776 100644
--- a/lib/action_cable/server/base.rb
+++ b/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'em-hiredis'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -18,6 +20,7 @@ module ActionCable
# Called by rack to setup the server.
def call(env)
+ setup_heartbeat_timer
config.connection_class.new(self, env).process
end
@@ -65,5 +68,7 @@ module ActionCable
config.connection_class.identifiers
end
end
+
+ ActiveSupport.run_load_hooks(:action_cable, Base.config)
end
end
diff --git a/lib/action_cable/server/broadcasting.rb b/lib/action_cable/server/broadcasting.rb
index 037b98951e..6e0fbae387 100644
--- a/lib/action_cable/server/broadcasting.rb
+++ b/lib/action_cable/server/broadcasting.rb
@@ -1,3 +1,5 @@
+require 'redis'
+
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
@@ -44,9 +46,9 @@ module ActionCable
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
- server.broadcasting_redis.publish broadcasting, message.to_json
+ server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/configuration.rb b/lib/action_cable/server/configuration.rb
index 4808d170ff..89a0caddb4 100644
--- a/lib/action_cable/server/configuration.rb
+++ b/lib/action_cable/server/configuration.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActionCable
module Server
# An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
@@ -6,6 +8,7 @@ module ActionCable
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :redis_path, :channels_path
+ attr_accessor :disable_request_forgery_protection, :allowed_request_origins
def initialize
@logger = Rails.logger
@@ -16,6 +19,8 @@ module ActionCable
@redis_path = Rails.root.join('config/redis/cable.yml')
@channels_path = Rails.root.join('app/channels')
+
+ @disable_request_forgery_protection = false
end
def log_to_stdout
diff --git a/lib/action_cable/server/connections.rb b/lib/action_cable/server/connections.rb
index 15d7c3c8c7..47dcea8c20 100644
--- a/lib/action_cable/server/connections.rb
+++ b/lib/action_cable/server/connections.rb
@@ -4,6 +4,8 @@ module ActionCable
# you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
# As such, this is primarily for internal use.
module Connections
+ BEAT_INTERVAL = 3
+
def connections
@connections ||= []
end
@@ -16,9 +18,20 @@ module ActionCable
connections.delete connection
end
+ # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
+ # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
+ # disconnect.
+ def setup_heartbeat_timer
+ EM.next_tick do
+ @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
+ EM.next_tick { connections.map(&:beat) }
+ end
+ end
+ end
+
def open_connections_statistics
connections.map(&:statistics)
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb
index d7823ecf93..e063b2a2e1 100644
--- a/lib/action_cable/server/worker.rb
+++ b/lib/action_cable/server/worker.rb
@@ -1,3 +1,6 @@
+require 'celluloid'
+require 'active_support/callbacks'
+
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
@@ -5,10 +8,13 @@ module ActionCable
include ActiveSupport::Callbacks
include Celluloid
+ attr_reader :connection
define_callbacks :work
- include ClearDatabaseConnections
+ include ActiveRecordConnectionManagement
def invoke(receiver, method, *args)
+ @connection = receiver
+
run_callbacks :work do
receiver.send method, *args
end
@@ -20,6 +26,8 @@ module ActionCable
end
def run_periodic_timer(channel, callback)
+ @connection = channel.connection
+
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
@@ -31,4 +39,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/server/worker/clear_database_connections.rb b/lib/action_cable/server/worker/active_record_connection_management.rb
index 722d363a41..ecece4e270 100644
--- a/lib/action_cable/server/worker/clear_database_connections.rb
+++ b/lib/action_cable/server/worker/active_record_connection_management.rb
@@ -2,7 +2,7 @@ module ActionCable
module Server
class Worker
# Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
- module ClearDatabaseConnections
+ module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
included do
@@ -12,7 +12,7 @@ module ActionCable
end
def with_database_connections
- yield
+ connection.logger.tag(ActiveRecord::Base.logger) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end