1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
|
require 'action_dispatch'
module ActionCable
module Connection
# For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
# of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
#
# def connect
# self.current_user = find_verified_user
# logger.add_tags current_user.name
# end
#
# def disconnect
# # Any cleanup work needed when the cable connection is cut.
# end
#
# protected
# def find_verified_user
# if current_user = User.find_by_identity cookies.signed[:identity_id]
# current_user
# else
# reject_unauthorized_connection
# end
# end
# end
# end
#
# First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
# identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
include Identification
include InternalChannel
include Authorization
attr_reader :server, :env, :subscriptions, :logger
delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_subscriptions = nil
@started_at = Time.now
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
respond_to_successful_request
else
respond_to_invalid_request
end
end
# Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
# The data is routed to the proper channel that the connection has subscribed to.
def receive(data_in_json)
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
# Close the WebSocket connection.
def close
websocket.close
end
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
request_id: @env['action_dispatch.request_id']
}
end
def beat
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
def on_open # :nodoc:
send_async :handle_open
end
def on_message(message) # :nodoc:
message_buffer.append message
end
def on_error(message) # :nodoc:
# ignore
end
def on_close(reason, code) # :nodoc:
send_async :handle_close
end
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
ActionDispatch::Request.new(environment || env)
end
end
# The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies
request.cookie_jar
end
attr_reader :websocket
attr_reader :message_buffer
private
def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
end
def handle_close
logger.info finished_request_message
server.remove_connection(self)
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
disconnect if respond_to?(:disconnect)
end
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
true
else
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
false
end
end
def respond_to_successful_request
logger.info successful_request_message
websocket.rack_response
end
def respond_to_invalid_request
close if websocket.alive?
logger.error invalid_request_message
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,
tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]',
request.ip,
Time.now.to_s ]
end
def invalid_request_message
'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
def successful_request_message
'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [
env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"]
]
end
end
end
end
|