aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/connection
diff options
context:
space:
mode:
Diffstat (limited to 'lib/action_cable/connection')
-rw-r--r--lib/action_cable/connection/base.rb62
-rw-r--r--lib/action_cable/connection/heartbeat.rb30
-rw-r--r--lib/action_cable/connection/identification.rb16
-rw-r--r--lib/action_cable/connection/internal_channel.rb4
-rw-r--r--lib/action_cable/connection/message_buffer.rb4
-rw-r--r--lib/action_cable/connection/subscriptions.rb2
-rw-r--r--lib/action_cable/connection/tagged_logger_proxy.rb5
-rw-r--r--lib/action_cable/connection/web_socket.rb2
8 files changed, 65 insertions, 60 deletions
diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb
index 84393845c4..a629f29643 100644
--- a/lib/action_cable/connection/base.rb
+++ b/lib/action_cable/connection/base.rb
@@ -1,8 +1,8 @@
-require 'action_dispatch/http/request'
+require 'action_dispatch'
module ActionCable
module Connection
- # For every websocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
+ # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
# of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
@@ -37,8 +37,8 @@ module ActionCable
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
# identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
#
- # Second, we rely on the fact that the websocket connection is established with the cookies from the domain being sent along. This makes
- # it easy to use signed cookies that were set when logging in via a web interface to authorize the websocket connection.
+ # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
+ # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
#
@@ -59,19 +59,18 @@ module ActionCable
@logger = new_tagged_logger || server.logger
@websocket = ActionCable::Connection::WebSocket.new(env)
- @heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@started_at = Time.now
end
- # Called by the server when a new websocket connection is established. This configures the callbacks intended for overwriting by the user.
+ # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
- if websocket.possible?
+ if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
@@ -88,19 +87,18 @@ module ActionCable
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
- logger.error "Received data without a live websocket (#{data.inspect})"
+ logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
- # Send raw data straight back down the websocket. This is not intended to be called directly. Use the #transmit available on the
+ # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
- # Close the websocket connection.
+ # Close the WebSocket connection.
def close
- logger.error "Closing connection"
websocket.close
end
@@ -112,12 +110,21 @@ module ActionCable
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
- { identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
+ {
+ identifier: connection_identifier,
+ started_at: @started_at,
+ subscriptions: subscriptions.identifiers,
+ request_id: @env['action_dispatch.request_id']
+ }
+ end
+
+ def beat
+ transmit ActiveSupport::JSON.encode(identifier: '_ping', message: Time.now.to_i)
end
protected
- # The request that initiated the websocket connection is available here. This gives access to the environment, cookies, etc.
+ # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
@@ -125,7 +132,7 @@ module ActionCable
end
end
- # The cookies of the request that initiated the websocket connection. Useful for performing authorization checks.
+ # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies
request.cookie_jar
end
@@ -133,16 +140,15 @@ module ActionCable
private
attr_reader :websocket
- attr_reader :heartbeat, :subscriptions, :message_buffer
+ attr_reader :subscriptions, :message_buffer
def on_open
- server.add_connection(self)
-
connect if respond_to?(:connect)
subscribe_to_internal_channel
- heartbeat.start
+ beat
message_buffer.process!
+ server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
close
@@ -159,12 +165,22 @@ module ActionCable
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
- heartbeat.stop
disconnect if respond_to?(:disconnect)
end
+ def allow_request_origin?
+ return true if server.config.disable_request_forgery_protection
+
+ if Array(server.config.allowed_request_origins).include? env['HTTP_ORIGIN']
+ true
+ else
+ logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
+ false
+ end
+ end
+
def respond_to_successful_request
websocket.rack_response
end
@@ -187,17 +203,17 @@ module ActionCable
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
- websocket.possible? ? ' [Websocket]' : '',
+ websocket.possible? ? ' [WebSocket]' : '',
request.ip,
- Time.now.to_default_s ]
+ Time.now.to_s ]
end
end
end
diff --git a/lib/action_cable/connection/heartbeat.rb b/lib/action_cable/connection/heartbeat.rb
deleted file mode 100644
index 2918938ba5..0000000000
--- a/lib/action_cable/connection/heartbeat.rb
+++ /dev/null
@@ -1,30 +0,0 @@
-module ActionCable
- module Connection
- # Websocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
- # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
- # disconnect.
- class Heartbeat
- BEAT_INTERVAL = 3
-
- def initialize(connection)
- @connection = connection
- end
-
- def start
- beat
- @timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
- end
-
- def stop
- EventMachine.cancel_timer(@timer) if @timer
- end
-
- private
- attr_reader :connection
-
- def beat
- connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
- end
- end
- end
-end
diff --git a/lib/action_cable/connection/identification.rb b/lib/action_cable/connection/identification.rb
index 701e6885ad..431493aa70 100644
--- a/lib/action_cable/connection/identification.rb
+++ b/lib/action_cable/connection/identification.rb
@@ -1,3 +1,5 @@
+require 'set'
+
module ActionCable
module Connection
module Identification
@@ -22,12 +24,22 @@ module ActionCable
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
def connection_identifier
- @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ unless defined? @connection_identifier
+ @connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
+ end
+
+ @connection_identifier
end
private
def connection_gid(ids)
- ids.map { |o| (o.try(:to_global_id) || o).to_s }.sort.join(":")
+ ids.map do |o|
+ if o.respond_to? :to_global_id
+ o.to_global_id
+ else
+ o.to_s
+ end
+ end.sort.join(":")
end
end
end
diff --git a/lib/action_cable/connection/internal_channel.rb b/lib/action_cable/connection/internal_channel.rb
index b00e21824c..c065a24ab7 100644
--- a/lib/action_cable/connection/internal_channel.rb
+++ b/lib/action_cable/connection/internal_channel.rb
@@ -15,14 +15,14 @@ module ActionCable
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
- pubsub.subscribe(internal_redis_channel, &callback)
+ EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
- @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) }
+ @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
diff --git a/lib/action_cable/connection/message_buffer.rb b/lib/action_cable/connection/message_buffer.rb
index d5a8e9eba9..25cff75b41 100644
--- a/lib/action_cable/connection/message_buffer.rb
+++ b/lib/action_cable/connection/message_buffer.rb
@@ -1,6 +1,6 @@
module ActionCable
module Connection
- # Allows us to buffer messages received from the websocket before the Connection has been fully initialized and is ready to receive them.
+ # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@@ -50,4 +50,4 @@ module ActionCable
end
end
end
-end \ No newline at end of file
+end
diff --git a/lib/action_cable/connection/subscriptions.rb b/lib/action_cable/connection/subscriptions.rb
index 69e3f60706..229be2a316 100644
--- a/lib/action_cable/connection/subscriptions.rb
+++ b/lib/action_cable/connection/subscriptions.rb
@@ -1,3 +1,5 @@
+require 'active_support/core_ext/hash/indifferent_access'
+
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
diff --git a/lib/action_cable/connection/tagged_logger_proxy.rb b/lib/action_cable/connection/tagged_logger_proxy.rb
index 854f613f1c..34063c1d42 100644
--- a/lib/action_cable/connection/tagged_logger_proxy.rb
+++ b/lib/action_cable/connection/tagged_logger_proxy.rb
@@ -4,6 +4,8 @@ module ActionCable
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
+ attr_reader :tags
+
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
@@ -22,7 +24,8 @@ module ActionCable
protected
def log(type, message)
- @logger.tagged(*@tags) { @logger.send type, message }
+ current_tags = tags - @logger.formatter.current_tags
+ @logger.tagged(*current_tags) { @logger.send type, message }
end
end
end
diff --git a/lib/action_cable/connection/web_socket.rb b/lib/action_cable/connection/web_socket.rb
index 135a28cfe4..169b683b8c 100644
--- a/lib/action_cable/connection/web_socket.rb
+++ b/lib/action_cable/connection/web_socket.rb
@@ -1,3 +1,5 @@
+require 'faye/websocket'
+
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.