aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/connection/base.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/connection/base.rb')
-rw-r--r--actioncable/lib/action_cable/connection/base.rb43
1 files changed, 22 insertions, 21 deletions
diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb
index f7b18a85ae..0016d1a1a4 100644
--- a/actioncable/lib/action_cable/connection/base.rb
+++ b/actioncable/lib/action_cable/connection/base.rb
@@ -48,21 +48,19 @@ module ActionCable
include InternalChannel
include Authorization
- attr_reader :server, :env, :subscriptions
- delegate :worker_pool, :pubsub, to: :server
-
- attr_reader :logger
+ attr_reader :server, :env, :subscriptions, :logger
+ delegate :stream_event_loop, :worker_pool, :pubsub, to: :server
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
- @websocket = ActionCable::Connection::WebSocket.new(env)
+ @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
- @_internal_redis_subscriptions = nil
+ @_internal_subscriptions = nil
@started_at = Time.now
end
@@ -72,10 +70,6 @@ module ActionCable
logger.info started_request_message
if websocket.possible? && allow_request_origin?
- websocket.on(:open) { |event| send_async :on_open }
- websocket.on(:message) { |event| on_message event.data }
- websocket.on(:close) { |event| send_async :on_close }
-
respond_to_successful_request
else
respond_to_invalid_request
@@ -105,7 +99,7 @@ module ActionCable
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
- worker_pool.async.invoke(self, method, *arguments)
+ worker_pool.async_invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
@@ -123,6 +117,21 @@ module ActionCable
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
+ def on_open # :nodoc:
+ send_async :handle_open
+ end
+
+ def on_message(message) # :nodoc:
+ message_buffer.append message
+ end
+
+ def on_error(message) # :nodoc:
+ # ignore
+ end
+
+ def on_close # :nodoc:
+ send_async :handle_close
+ end
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
@@ -138,13 +147,11 @@ module ActionCable
request.cookie_jar
end
-
- protected
attr_reader :websocket
attr_reader :message_buffer
private
- def on_open
+ def handle_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
@@ -155,11 +162,7 @@ module ActionCable
respond_to_invalid_request
end
- def on_message(message)
- message_buffer.append message
- end
-
- def on_close
+ def handle_close
logger.info finished_request_message
server.remove_connection(self)
@@ -170,7 +173,6 @@ module ActionCable
disconnect if respond_to?(:disconnect)
end
-
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
@@ -193,7 +195,6 @@ module ActionCable
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
-
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,