diff options
Diffstat (limited to 'actioncable/lib')
35 files changed, 502 insertions, 214 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 1dc66ef3ad..68a5fff3e7 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -29,13 +29,13 @@ module ActionCable extend ActiveSupport::Autoload INTERNAL = { - identifiers: { - ping: '_ping'.freeze - }, message_types: { + welcome: 'welcome'.freeze, + ping: 'ping'.freeze, confirmation: 'confirm_subscription'.freeze, rejection: 'reject_subscription'.freeze - } + }, + default_mount_path: '/cable'.freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 874ebe2e71..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,8 @@ module ActionCable # # == Action processing # - # Unlike subclasses of ActionController::Base, channels do not follow a REST - # constraint form for their actions. Instead, ActionCable operates through a + # Unlike subclasses of ActionController::Base, channels do not follow a RESTful + # constraint form for their actions. Instead, Action Cable operates through a # remote-procedure call model. You can declare any public method on the # channel (optionally taking a <tt>data</tt> argument), and this method is # automatically exposed as callable to the client. @@ -63,10 +63,10 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they + # In this example, the subscribed and unsubscribed methods are not callable methods, as they # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt> # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not - # callable as it's a private method. You'll see that appear accepts a data + # callable, since it's a private method. You'll see that appear accepts a data # parameter, which it then uses as part of its model call. <tt>#away</tt> # does not, since it's simply a trigger action. # @@ -125,7 +125,7 @@ module ActionCable protected # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time - # you run action_methods, they will be recalculated + # you run action_methods, they will be recalculated. def clear_action_methods! @action_methods = nil end @@ -160,15 +160,18 @@ module ActionCable action = extract_action(data) if processable_action?(action) - dispatch_action(action, data) + payload = { channel_class: self.class.name, action: action, data: data } + ActiveSupport::Notifications.instrument("perform_action.action_cable", payload) do + dispatch_action(action, data) + end else logger.error "Unable to process #{action_signature(action, data)}" end end - # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. - def unsubscribe_from_channel + def unsubscribe_from_channel # :nodoc: run_callbacks :unsubscribe do unsubscribed end @@ -183,7 +186,7 @@ module ActionCable end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking - # people as offline or the like. + # users as offline or the like. def unsubscribed # Override in subclasses end @@ -191,8 +194,12 @@ module ActionCable # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } + + payload = { channel_class: self.class.name, data: data, via: via } + ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do + connection.transmit identifier: @identifier, message: data + end end def defer_subscription_confirmation! @@ -224,7 +231,6 @@ module ActionCable end end - def subscribe_to_channel run_callbacks :subscribe do subscribed @@ -237,7 +243,6 @@ module ActionCable end end - def extract_action(data) (data['action'].presence || :receive).to_sym end @@ -267,8 +272,11 @@ module ActionCable def transmit_subscription_confirmation unless subscription_confirmation_sent? logger.info "#{self.class.name} is transmitting the subscription confirmation" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) - @subscription_confirmation_sent = true + + ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] + @subscription_confirmation_sent = true + end end end @@ -279,7 +287,10 @@ module ActionCable def transmit_subscription_rejection logger.info "#{self.class.name} is transmitting the subscription rejection" - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + + ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] + end end end end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..b414255707 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,7 +12,7 @@ module ActionCable end module ClassMethods - # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful + # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful # for sending a steady flow of updates to a client based off an object that was configured on subscription. # It's an alternative to using streams if the channel is able to do the work internally. def periodically(callback, every:) @@ -27,7 +27,7 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << connection.server.event_loop.timer(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 3158f30814..f654ce0bfa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,8 +1,8 @@ module ActionCable module Channel - # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data - # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data + # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -18,8 +18,10 @@ module ActionCable # end # end # - # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. - # That looks like so from that side of things: + # Based on the above example, the subscribers of this channel will get whatever data is put into the, + # let's say, `comments_for_45` broadcasting as soon as it's put there. + # + # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # @@ -37,16 +39,14 @@ module ActionCable # # CommentsChannel.broadcast_to(@post, @comment) # - # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. - # Example below shows how you can use this to provide performance introspection in the process: + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. + # The below example shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -69,15 +69,23 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) - # Hold off the confirmation until pubsub#subscribe is successful + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) + broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + if handler = callback || block + handler = -> message { handler.(coder.decode(message)) } if coder + else + handler = default_stream_handler(broadcasting, coder: coder) + end - Concurrent.global_io_executor.post do - pubsub.subscribe(broadcasting, callback, lambda do + streams << [ broadcasting, handler ] + + connection.server.event_loop.post do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -87,8 +95,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -106,9 +117,11 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit coder.decode(message), via: "streamed from #{broadcasting}" end end end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..5f813cf8e0 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,6 +8,8 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel + autoload :FayeClientSocket + autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b5f898436a..604a889bb0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -2,9 +2,9 @@ require 'action_dispatch' module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,9 +33,9 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many - # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many + # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. @@ -48,15 +48,16 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder + @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -65,8 +66,8 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -76,20 +77,22 @@ module ActionCable end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -114,7 +117,7 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -151,10 +154,18 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel - beat + send_welcome_message message_buffer.process! server.add_connection(self) @@ -173,6 +184,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def send_welcome_message + # Send welcome message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit type: ActionCable::INTERNAL[:message_types][:welcome] + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection @@ -185,12 +203,14 @@ module ActionCable end def respond_to_successful_request + logger.info successful_request_message websocket.rack_response end def respond_to_invalid_request close if websocket.alive? + logger.error invalid_request_message logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -205,7 +225,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end @@ -213,10 +233,22 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end + + def invalid_request_message + 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end + + def successful_request_message + 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end end end end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index ef937d7c16..7d6de78582 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -29,10 +29,10 @@ module ActionCable attr_reader :env, :url - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop + def initialize(env, event_target, event_loop) + @env = env + @event_target = event_target + @event_loop = event_loop @url = ClientSocket.determine_url(@env) @@ -49,15 +49,17 @@ module ActionCable @driver.on(:close) { |e| begin_close(e.reason, e.code) } @driver.on(:error) { |e| emit_error(e.message) } - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + @stream = ActionCable::Connection::Stream.new(@event_loop, self) + end + + def start_driver + return if @driver.nil? || @driver_started + @stream.hijack_rack_socket if callback = @env['async.callback'] callback.call([101, {}, @stream]) end - end - def start_driver - return if @driver.nil? || @driver_started @driver_started = true @driver.start end @@ -69,6 +71,8 @@ module ActionCable def write(data) @stream.write(data) + rescue => e + emit_error e.message end def transmit(message) @@ -132,11 +136,8 @@ module ActionCable @ready_state = CLOSING @close_params = [reason, code] - if @stream - @stream.shutdown - else - finalize_close - end + @stream.shutdown if @stream + finalize_close end def finalize_close diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb new file mode 100644 index 0000000000..47d09a9e14 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -0,0 +1,43 @@ +require 'faye/websocket' + +module ActionCable + module Connection + class FayeClientSocket + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + + @faye = nil + end + + def alive? + @faye && @faye.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + connect + @faye.send data + end + + def close + @faye && @faye.close + end + + def rack_response + connect + @faye.rack_response + end + + private + def connect + return if @faye + @faye = Faye::WebSocket.new(@env) + + @faye.on(:open) { |event| @event_target.on_open } + @faye.on(:message) { |event| @event_target.on_message(event.data) } + @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + @faye.on(:error) { |event| @event_target.on_error(event.message) } + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb new file mode 100644 index 0000000000..9c44b38bc3 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -0,0 +1,44 @@ +require 'thread' + +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module Connection + class FayeEventLoop + @@mutex = Mutex.new + + def timer(interval, &block) + ensure_reactor_running + EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) + end + + def post(task = nil, &block) + task ||= block + + ensure_reactor_running + ::EM.next_tick(&task) + end + + private + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + + class EMTimer + def initialize(inner) + @inner = inner + end + + def shutdown + @inner.cancel + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..4a54044aff 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,24 +11,22 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] @@ -31,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -39,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index ace250cd16..0cf59091bc 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -4,15 +4,13 @@ module ActionCable # This class is heavily based on faye-websocket-ruby # # Copyright (c) 2010-2015 James Coglan - class Stream + class Stream # :nodoc: def initialize(event_loop, socket) @event_loop = event_loop @socket_object = socket @stream_send = socket.env['stream.send'] @rack_hijack_io = nil - - hijack_rack_socket end def each(&callback) @@ -31,7 +29,7 @@ module ActionCable def write(data) return @rack_hijack_io.write(data) if @rack_hijack_io return @stream_send.call(data) if @stream_send - rescue EOFError + rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end @@ -39,16 +37,16 @@ module ActionCable @socket_object.parse(data) end - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] - @event_loop.attach(@rack_hijack_io, self) - end + @event_loop.attach(@rack_hijack_io, self) + end + private def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index e6335082d2..2abad09c03 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -11,7 +11,16 @@ module ActionCable @todo = Queue.new @spawn_mutex = Mutex.new - spawn + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + + Concurrent.global_io_executor << task end def attach(io, stream) diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access' module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} @@ -54,7 +54,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..0bec9b6a96 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + def initialize(env, event_target, event_loop, client_socket_class) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil end def possible? diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index f5e233e091..7dc541d00c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access" module ActionCable class Railtie < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.url = '/cable' + config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] config.eager_load_namespaces << ActionCable @@ -31,8 +31,50 @@ module ActionCable self.cable = Rails.application.config_for(config_path).with_indifferent_access end + if 'ApplicationCable::Connection'.safe_constantize + self.connection_class = ApplicationCable::Connection + end + + self.channel_paths = Rails.application.paths['app/channels'].existent + options.each { |k,v| send("#{k}=", v) } end end + + initializer "action_cable.routes" do + config.after_initialize do |app| + config = app.config + unless config.action_cable.mount_path.nil? + app.routes.prepend do + mount ActionCable.server => config.action_cable.mount_path, internal: true + end + end + end + end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index a71603e61a..67adeefaff 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -8,7 +8,7 @@ module ActionCable MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta2" + PRE = "beta3" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index b82751468a..2081a37db6 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -1,28 +1,39 @@ module ActionCable module Helpers module ActionCableHelper - # Returns an "action-cable-url" meta tag with the value of the url specified in your - # configuration. Ensure this is above your javascript tag: + # Returns an "action-cable-url" meta tag with the value of the URL specified in your + # configuration. Ensure this is above your JavaScript tag: # # <head> # <%= action_cable_meta_tag %> # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> # </head> # - # This is then used by ActionCable to determine the url of your websocket server. + # This is then used by Action Cable to determine the URL of your WebSocket server. # Your CoffeeScript can then connect to the server without needing to specify the - # url directly: + # URL directly: # # #= require cable # @App = {} # App.cable = Cable.createConsumer() # - # Make sure to specify the correct server location in each of your environments - # config file: + # Make sure to specify the correct server location in each of your environment + # config files: + # + # config.action_cable.mount_path = "/cable123" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="/cable123" /> + # + # config.action_cable.url = "ws://actioncable.com" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="ws://actioncable.com" /> # - # config.action_cable.url = "ws://example.com:28080" def action_cable_meta_tag - tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url + tag "meta", name: "action-cable-url", content: ( + ActionCable.server.config.url || + ActionCable.server.config.mount_path || + raise("No Action Cable URL configured -- please configure this at config.action_cable.url") + ) end end end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 7ec121308a..aeef8abc72 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -13,8 +13,8 @@ module ActionCable # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # # This will disconnect all the connections established for - # <tt>User.find(1)</tt> across all servers running on all machines, because - # it uses the internal channel that all these servers are subscribed to. + # <tt>User.find(1)</tt>, across all servers running on all machines, because + # it uses the internal channel that all of these servers are subscribed to. class RemoteConnections attr_reader :server diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index fe48c112df..b1a0e11631 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,11 +1,11 @@ -require 'thread' +require 'monitor' module ActionCable module Server - # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but - # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers. + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. # - # Also, this is the server instance used for broadcasting. See Broadcasting for details. + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. class Base include ActionCable::Server::Broadcasting include ActionCable::Server::Connections @@ -18,12 +18,11 @@ module ActionCable attr_reader :mutex def initialize - @mutex = Mutex.new - - @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil end - # Called by rack to setup the server. + # Called by Rack to setup the server. def call(env) setup_heartbeat_timer config.connection_class.new(self, env).process @@ -34,21 +33,41 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end - def stream_event_loop - @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. + # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. + # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out + # at 4 worker threads by default. Tune the size yourself with config.action_cable.worker_pool_size. + # + # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. + # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database + # connections. + # + # Also, ensure that your database connection pool size is as least as large as your worker pool size. Otherwise, workers may oversubscribe + # the db connection pool and block while they wait for other workers to release their connections. Use a smaller worker pool or a larger + # db connection pool instead. def worker_pool @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end - # Requires and returns a hash of all the channel class constants keyed by name. + # Requires and returns a hash of all of the channel class constants, which are keyed by name. def channel_classes @channel_classes || @mutex.synchronize do @channel_classes ||= begin @@ -63,7 +82,7 @@ module ActionCable @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end - # All the identifiers applied to the connection class associated with this server. + # All of the identifiers applied to the connection class associated with this server. def connection_identifiers config.connection_class.identifiers end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 7e8aef45f4..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,6 +1,6 @@ module ActionCable module Server - # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel @@ -9,37 +9,38 @@ module ActionCable # end # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob + # # Somewhere in your app this is called, perhaps from a NewCommentJob: # ActionCable.server.broadcast \ # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: # App.cable.subscriptions.create "WebNotificationsChannel", # received: (data) -> # new Notification data['title'], body: data['body'] module Broadcasting - # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end - # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that + # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, broadcasting) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a248933c4..0bb378cf03 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -1,31 +1,24 @@ module ActionCable module Server - # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :connection_class, :worker_pool_size - attr_accessor :channel_load_paths + attr_accessor :use_faye, :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :cable, :url + attr_accessor :cable, :url, :mount_path + + attr_accessor :channel_paths # :nodoc: def initialize @log_tags = [] - @connection_class = ApplicationCable::Connection - @worker_pool_size = 100 - - @channel_load_paths = [Rails.root.join('app/channels')] + @connection_class = ActionCable::Connection::Base + @worker_pool_size = 4 @disable_request_forgery_protection = false end - def channel_paths - @channel_paths ||= channel_load_paths.flat_map do |path| - Dir["#{path}/**/*_channel.rb"] - end - end - def channel_class_names @channel_class_names ||= channel_paths.collect do |channel_path| Pathname.new(channel_path).basename.to_s.split('.').first.camelize @@ -50,6 +43,22 @@ module ActionCable adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize end + + def event_loop_class + if use_faye + ActionCable::Connection::FayeEventLoop + else + ActionCable::Connection::StreamEventLoop + end + end + + def client_socket_class + if use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end + end end end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -1,9 +1,8 @@ module ActionCable module Server - # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so - # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. - # As such, this is primarily for internal use. - module Connections + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: BEAT_INTERVAL = 3 def connections @@ -19,12 +18,12 @@ module ActionCable end # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..49cbaec0c0 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -4,8 +4,8 @@ require 'concurrent' module ActionCable module Server - # Worker used by Server.send_async to do connection work in threads. Only for internal use. - class Worker + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: include ActiveSupport::Callbacks thread_mattr_accessor :connection @@ -20,6 +20,26 @@ module ActionCable ) end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @pool.kill + end + + def stopping? + @pool.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -27,19 +47,15 @@ module ActionCable end def invoke(receiver, method, *args) - begin - self.connection = receiver - - run_callbacks :work do + work(receiver) do + begin receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end end end @@ -50,14 +66,8 @@ module ActionCable end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection - - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + work(channel.connection) do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index ecece4e270..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,10 +12,8 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index cca6894289..10b3ac8cd8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -5,16 +5,21 @@ module ActionCable class Async < Inline # :nodoc: private def new_subscriber_map - AsyncSubscriberMap.new + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index d697548cbd..256876cf30 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -13,6 +13,14 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new + # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil @@ -41,9 +49,9 @@ module ActionCable def redis_connection_for_subscriptions ensure_reactor_running @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @logger.error "[ActionCable] Redis reconnect failed." end end end @@ -51,7 +59,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index abaeb92e54..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -42,14 +42,15 @@ module ActionCable private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -68,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -98,7 +99,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 7076383efe..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,6 +6,10 @@ require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @listener = nil @@ -29,25 +33,30 @@ module ActionCable end def redis_connection_for_subscriptions - ::Redis.new(@server.config.cable) + redis_connection end private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= redis_connection end end + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @@ -76,7 +85,7 @@ module ActionCable if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift - Concurrent.global_io_executor << next_callback if next_callback + @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end @@ -125,7 +134,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end private diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index 27a934c689..6249553c22 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -3,7 +3,7 @@ Description: Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascript/cable.coffee after generating any channels. + Note: Turn on the cable connection in app/assets/javascript/cable.js after generating any channels. Example: ======== diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c5d398810a..d89ab45816 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -15,11 +15,28 @@ module Rails if options[:assets] template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") end + + generate_application_cable_files end protected def file_name - @_file_name ||= super.gsub(/\_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, '') + end + + # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. + def generate_application_cable_files + return if self.behavior != :invoke + + files = [ + 'application_cable/channel.rb', + 'application_cable/connection.rb' + ] + + files.each do |name| + path = File.join('app/channels/', name) + template(name, path) if !File.exist?(path) + end end end end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb new file mode 100644 index 0000000000..d56fa30f4d --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb new file mode 100644 index 0000000000..b4f41389ad --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Connection < ActionCable::Connection::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed |