1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
|
# frozen_string_literal: true
require "action_dispatch"
module ActionCable
module Connection
# For every WebSocket connection the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent
# of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
#
# def connect
# self.current_user = find_verified_user
# logger.add_tags current_user.name
# end
#
# def disconnect
# # Any cleanup work needed when the cable connection is cut.
# end
#
# private
# def find_verified_user
# User.find_by_identity(cookies.encrypted[:identity_id]) ||
# reject_unauthorized_connection
# end
# end
# end
#
# First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections
# established for that current_user (and potentially disconnect them). You can declare as many
# identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with the name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
include Identification
include InternalChannel
include Authorization
attr_reader :server, :env, :subscriptions, :logger, :worker_pool, :protocol
delegate :event_loop, :pubsub, to: :server
def initialize(server, env, coder: ActiveSupport::JSON)
@server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_subscriptions = nil
@started_at = Time.now
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
def process #:nodoc:
logger.info started_request_message
if websocket.possible? && allow_request_origin?
respond_to_successful_request
else
respond_to_invalid_request
end
end
# Decodes WebSocket messages and dispatches them to subscribed channels.
# WebSocket message transfer encoding is always JSON.
def receive(websocket_message) #:nodoc:
send_async :dispatch_websocket_message, websocket_message
end
def dispatch_websocket_message(websocket_message) #:nodoc:
if websocket.alive?
subscriptions.execute_command decode(websocket_message)
else
logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
end
end
def transmit(cable_message) # :nodoc:
websocket.transmit encode(cable_message)
end
# Close the WebSocket connection.
def close(reason: nil, reconnect: true)
transmit(
type: ActionCable::INTERNAL[:message_types][:disconnect],
reason: reason,
reconnect: reconnect
)
websocket.close
end
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>.
# This can be returned by a health check against the connection.
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
request_id: @env["action_dispatch.request_id"]
}
end
def beat
transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
end
def on_open # :nodoc:
send_async :handle_open
end
def on_message(message) # :nodoc:
message_buffer.append message
end
def on_error(message) # :nodoc:
# log errors to make diagnosing socket errors easier
logger.error "WebSocket error occurred: #{message}"
end
def on_close(reason, code) # :nodoc:
send_async :handle_close
end
private
attr_reader :websocket
attr_reader :message_buffer
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request # :doc:
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
ActionDispatch::Request.new(environment || env)
end
end
# The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies # :doc:
request.cookie_jar
end
def encode(cable_message)
@coder.encode cable_message
end
def decode(websocket_message)
@coder.decode websocket_message
end
def handle_open
@protocol = websocket.protocol
connect if respond_to?(:connect)
subscribe_to_internal_channel
send_welcome_message
message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive?
end
def handle_close
logger.info finished_request_message
server.remove_connection(self)
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
disconnect if respond_to?(:disconnect)
end
def send_welcome_message
# Send welcome message to the internal connection monitor channel.
# This ensures the connection monitor state is reset after a successful
# websocket connection.
transmit type: ActionCable::INTERNAL[:message_types][:welcome]
end
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
proto = Rack::Request.new(env).ssl? ? "https" : "http"
if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}"
true
elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] }
true
else
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
false
end
end
def respond_to_successful_request
logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive?
logger.error invalid_request_message
logger.info finished_request_message
[ 404, { "Content-Type" => "text/plain" }, [ "Page not found" ] ]
end
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,
tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
websocket.possible? ? " [WebSocket]" : "[non-WebSocket]",
request.ip,
Time.now.to_s ]
end
def invalid_request_message
"Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
def successful_request_message
"Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)" % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
end
end
end
|