diff options
Diffstat (limited to 'actioncable/lib')
44 files changed, 1019 insertions, 568 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 1dc66ef3ad..68a5fff3e7 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -29,13 +29,13 @@ module ActionCable extend ActiveSupport::Autoload INTERNAL = { - identifiers: { - ping: '_ping'.freeze - }, message_types: { + welcome: 'welcome'.freeze, + ping: 'ping'.freeze, confirmation: 'confirm_subscription'.freeze, rejection: 'reject_subscription'.freeze - } + }, + default_mount_path: '/cable'.freeze } # Singleton instance of the server diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 88cdc1cab1..714d9887d4 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,11 @@ module ActionCable # # == Action processing # - # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can - # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # Unlike subclasses of ActionController::Base, channels do not follow a RESTful + # constraint form for their actions. Instead, Action Cable operates through a + # remote-procedure call model. You can declare any public method on the + # channel (optionally taking a <tt>data</tt> argument), and this method is + # automatically exposed as callable to the client. # # Example: # @@ -60,18 +63,22 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away - # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then - # uses as part of its model call. #away does not, it's simply a trigger action. + # In this example, the subscribed and unsubscribed methods are not callable methods, as they + # were already declared in ActionCable::Channel::Base, but <tt>#appear</tt> + # and <tt>#away</tt> are. <tt>#generate_connection_token</tt> is also not + # callable, since it's a private method. You'll see that appear accepts a data + # parameter, which it then uses as part of its model call. <tt>#away</tt> + # does not, since it's simply a trigger action. # - # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. - # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # Also note that in this example, <tt>current_user</tt> is available because + # it was marked as an identifying attribute on the connection. All such + # identifiers will automatically create a delegation method of the same name + # on the channel instance. # # == Rejecting subscription requests # - # A channel can reject a subscription request in the #subscribed callback by invoking #reject! - # - # Example: + # A channel can reject a subscription request in the #subscribed callback by + # invoking the #reject method: # # class ChatChannel < ApplicationCable::Channel # def subscribed @@ -80,8 +87,10 @@ module ActionCable # end # end # - # In this example, the subscription will be rejected if the current_user does not have access to the chat room. - # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + # In this example, the subscription will be rejected if the + # <tt>current_user</tt> does not have access to the chat room. On the + # client-side, the <tt>Channel#rejected</tt> callback will get invoked when + # the server rejects the subscription request. class Base include Callbacks include PeriodicTimers @@ -116,7 +125,7 @@ module ActionCable protected # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time - # you run action_methods, they will be recalculated + # you run action_methods, they will be recalculated. def clear_action_methods! @action_methods = nil end @@ -157,9 +166,9 @@ module ActionCable end end - # Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks. + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. - def unsubscribe_from_channel + def unsubscribe_from_channel # :nodoc: run_callbacks :unsubscribe do unsubscribed end @@ -174,7 +183,7 @@ module ActionCable end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking - # people as offline or the like. + # users as offline or the like. def unsubscribed # Override in subclasses end @@ -182,7 +191,7 @@ module ActionCable # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. def transmit(data, via: nil) - logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via } + logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) end @@ -215,7 +224,6 @@ module ActionCable end end - def subscribe_to_channel run_callbacks :subscribe do subscribed @@ -228,7 +236,6 @@ module ActionCable end end - def extract_action(data) (data['action'].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..b414255707 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -12,7 +12,7 @@ module ActionCable end module ClassMethods - # Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful + # Allows you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful # for sending a steady flow of updates to a client based off an object that was configured on subscription. # It's an alternative to using streams if the channel is able to do the work internally. def periodically(callback, every:) @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + active_periodic_timers << connection.server.event_loop.timer(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..23d7320a28 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -1,8 +1,8 @@ module ActionCable module Channel - # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data - # put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not - # streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later. + # Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pubsub queue where any data + # placed into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not + # streaming a broadcasting at the very moment it sends out an update, you will not get that update, if you connect after it has been sent. # # Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between # the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new @@ -18,8 +18,10 @@ module ActionCable # end # end # - # So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there. - # That looks like so from that side of things: + # Based on the above example, the subscribers of this channel will get whatever data is put into the, + # let's say, `comments_for_45` broadcasting as soon as it's put there. + # + # An example broadcasting for this channel looks like so: # # ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell' # @@ -37,26 +39,27 @@ module ActionCable # # CommentsChannel.broadcast_to(@post, @comment) # - # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out. - # Example below shows how you can use this to provide performance introspection in the process: + # If you don't just want to parlay the broadcast unfiltered to the subscriber, you can also supply a callback that lets you alter what is sent out. + # The below example shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel - # def subscribed - # @room = Chat::Room[params[:room_number]] + # def subscribed + # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) + # stream_for @room, -> (encoded_message) do + # message = ActiveSupport::JSON.decode(encoded_message) # - # if message['originated_at'].present? - # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) + # if message['originated_at'].present? + # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # - # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing - # logger.info "Message took #{elapsed_time}s to arrive" - # end + # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing + # logger.info "Message took #{elapsed_time}s to arrive" + # end # - # transmit message - # end - # end + # transmit message + # end + # end + # end # # You can stop streaming from all broadcasts by calling #stop_all_streams. module Streams @@ -69,13 +72,14 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. def stream_from(broadcasting, callback = nil) - # Hold off the confirmation until pubsub#subscribe is successful + broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick do + connection.server.event_loop.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" @@ -90,6 +94,7 @@ module ActionCable stream_from(broadcasting_for([ channel_name, model ]), callback) end + # Unsubscribes all streams associated with this channel from the pubsub queue. def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..5f813cf8e0 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,17 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel + autoload :FayeClientSocket + autoload :FayeEventLoop autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..b4488265cb 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -2,9 +2,9 @@ require 'action_dispatch' module ActionCable module Connection - # For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent - # of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions - # based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond + # For every WebSocket the Action Cable server accepts, a Connection object will be instantiated. This instance becomes the parent + # of all of the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions + # based on an identifier sent by the Action Cable consumer. The Connection itself does not deal with any specific application logic beyond # authentication and authorization. # # Here's a basic example: @@ -33,9 +33,9 @@ module ActionCable # end # end # - # First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections - # established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many - # identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key. + # First, we declare that this connection can be identified by its current_user. This allows us to later be able to find all connections + # established for that current_user (and potentially disconnect them). You can declare as many + # identification indexes as you like. Declaring an identification means that an attr_accessor is automatically set for that key. # # Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes # it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection. @@ -48,15 +48,16 @@ module ActionCable include InternalChannel include Authorization - attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + attr_reader :server, :env, :subscriptions, :logger, :worker_pool + delegate :event_loop, :pubsub, to: :server def initialize(server, env) @server, @env = server, env + @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -65,22 +66,18 @@ module ActionCable end # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. - # This method should not be called directly. Rely on the #connect (and #disconnect) callback instead. - def process + # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. + def process # :nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request end end - # Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded. + # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. # The data is routed to the proper channel that the connection has subscribed to. def receive(data_in_json) if websocket.alive? @@ -92,7 +89,7 @@ module ActionCable # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) + def transmit(data) # :nodoc: websocket.transmit data end @@ -118,7 +115,23 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) + transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i) + end + + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close(reason, code) # :nodoc: + send_async :handle_close end protected @@ -139,10 +152,10 @@ module ActionCable attr_reader :message_buffer private - def on_open + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel - beat + send_welcome_message message_buffer.process! server.add_connection(self) @@ -150,11 +163,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) @@ -165,6 +174,13 @@ module ActionCable disconnect if respond_to?(:disconnect) end + def send_welcome_message + # Send welcome message to the internal connection monitor channel. + # This ensures the connection monitor state is reset after a successful + # websocket connection. + transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome]) + end + def allow_request_origin? return true if server.config.disable_request_forgery_protection @@ -177,12 +193,14 @@ module ActionCable end def respond_to_successful_request + logger.info successful_request_message websocket.rack_response end def respond_to_invalid_request close if websocket.alive? + logger.error invalid_request_message logger.info finished_request_message [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ] end @@ -197,7 +215,7 @@ module ActionCable 'Started %s "%s"%s for %s at %s' % [ request.request_method, request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end @@ -205,10 +223,22 @@ module ActionCable def finished_request_message 'Finished "%s"%s for %s at %s' % [ request.filtered_path, - websocket.possible? ? ' [WebSocket]' : '', + websocket.possible? ? ' [WebSocket]' : '[non-WebSocket]', request.ip, Time.now.to_s ] end + + def invalid_request_message + 'Failed to upgrade to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end + + def successful_request_message + 'Successfully upgraded to WebSocket (REQUEST_METHOD: %s, HTTP_CONNECTION: %s, HTTP_UPGRADE: %s)' % [ + env["REQUEST_METHOD"], env["HTTP_CONNECTION"], env["HTTP_UPGRADE"] + ] + end end end end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..9e4dbcd6e6 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,149 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, event_loop) + @env = env + @event_target = event_target + @event_loop = event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + @close_params = ['', 1006] + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@event_loop, self) + end + + def start_driver + return if @driver.nil? || @driver_started + @stream.hijack_rack_socket + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + @stream.shutdown if @stream + finalize_close + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + @event_target.on_close(*@close_params) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb new file mode 100644 index 0000000000..c9139b6858 --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_client_socket.rb @@ -0,0 +1,42 @@ +require 'faye/websocket' + +module ActionCable + module Connection + class FayeClientSocket + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + + @faye = nil + end + + def alive? + @faye && @faye.ready_state == Faye::WebSocket::API::OPEN + end + + def transmit(data) + connect + @faye.send data + end + + def close + @faye && @faye.close + end + + def rack_response + connect + @faye.rack_response + end + + private + def connect + return if @faye + @faye = Faye::WebSocket.new(@env) + + @faye.on(:open) { |event| @event_target.on_open } + @faye.on(:message) { |event| @event_target.on_message(event.data) } + @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb new file mode 100644 index 0000000000..8b70f3d84e --- /dev/null +++ b/actioncable/lib/action_cable/connection/faye_event_loop.rb @@ -0,0 +1,44 @@ +require 'thread' + +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module Connection + class FayeEventLoop + @@mutex = Mutex.new + + def timer(interval, &block) + ensure_reactor_running + EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) + end + + def post(task = nil, &block) + task ||= block + + ensure_reactor_running + ::EM.next_tick(&task) + end + + private + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + + class EMTimer + def initialize(inner) + @inner = inner + end + + def shutdown + inner.cancel + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/identification.rb b/actioncable/lib/action_cable/connection/identification.rb index 885ff3f102..4a54044aff 100644 --- a/actioncable/lib/action_cable/connection/identification.rb +++ b/actioncable/lib/action_cable/connection/identification.rb @@ -12,7 +12,7 @@ module ActionCable class_methods do # Mark a key as being a connection identifier index that can then be used to find the specific connection again later. - # Common identifiers are current_user and current_account, but could be anything really. + # Common identifiers are current_user and current_account, but could be anything, really. # # Note that anything marked as an identifier will automatically create a delegate by the same name on any # channel instances created off the connection. diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..3c5d39f59a 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + server.event_loop.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| server.event_loop.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 2f65a1e84a..19f2e6e918 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -1,8 +1,7 @@ module ActionCable module Connection - # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them. - # Entirely internal operation and should not be used directly by the user. - class MessageBuffer + # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized, and is ready to receive them. + class MessageBuffer # :nodoc: def initialize(connection) @connection = connection @buffered_messages = [] diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..2d97b28c09 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,57 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream # :nodoc: + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + private + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..2abad09c03 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,103 @@ +require 'nio' +require 'thread' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = @thread = nil + @map = {} + @stopping = false + @todo = Queue.new + + @spawn_mutex = Mutex.new + end + + def timer(interval, &block) + Concurrent::TimerTask.new(execution_interval: interval, &block).tap(&:execute) + end + + def post(task = nil, &block) + task ||= block + + Concurrent.global_io_executor << task + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister io + @map.delete io + end + wakeup + end + + def stop + @stopping = true + wakeup if @nio + end + + private + def spawn + return if @thread && @thread.status + + @spawn_mutex.synchronize do + return if @thread && @thread.status + + @nio ||= NIO::Selector.new + @thread = Thread.new { run } + + return true + end + end + + def wakeup + spawn || @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + next unless monitors = @nio.select + + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue + # We expect one of EOFError or Errno::ECONNRESET in + # normal operation (when the client goes away). But if + # anything else goes wrong, this is still the best way + # to handle it. + begin + stream.close + rescue + @nio.deregister io + @map.delete io + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index d7f95e6a62..5aa907c2d3 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -3,8 +3,8 @@ require 'active_support/core_ext/hash/indifferent_access' module ActionCable module Connection # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on - # the connection to the proper channel. Should not be used directly by the user. - class Subscriptions + # the connection to the proper channel. + class Subscriptions # :nodoc: def initialize(connection) @connection = connection @subscriptions = {} @@ -23,13 +23,13 @@ module ActionCable end def add(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + id_options = decode_hash(data['identifier']) + identifier = normalize_identifier(id_options) subscription_klass = connection.server.channel_classes[id_options[:channel]] if subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options) else logger.error "Subscription class not found (#{data.inspect})" end @@ -37,7 +37,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[data['identifier']] + remove_subscription subscriptions[normalize_identifier(data['identifier'])] end def remove_subscription(subscription) @@ -46,7 +46,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action ActiveSupport::JSON.decode(data['data']) + find(data).perform_action(decode_hash(data['data'])) end def identifiers @@ -54,7 +54,7 @@ module ActionCable end def unsubscribe_from_all - subscriptions.each { |id, channel| channel.unsubscribe_from_channel } + subscriptions.each { |id, channel| remove_subscription(channel) } end protected @@ -63,8 +63,21 @@ module ActionCable private delegate :logger, to: :connection + def normalize_identifier(identifier) + identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash) + identifier + end + + # If `data` is a Hash, this means that the original JSON + # sent by the client had no backslashes in it, and does + # not need to be decoded again. + def decode_hash(data) + data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash) + data.with_indifferent_access + end + def find(data) - if subscription = subscriptions[data['identifier']] + if subscription = subscriptions[normalize_identifier(data['identifier'])] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..0bec9b6a96 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, event_loop, client_socket_class) + @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop) : nil end def possible? @@ -15,11 +13,19 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index f5e233e091..7dc541d00c 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -6,7 +6,7 @@ require "active_support/core_ext/hash/indifferent_access" module ActionCable class Railtie < Rails::Engine # :nodoc: config.action_cable = ActiveSupport::OrderedOptions.new - config.action_cable.url = '/cable' + config.action_cable.mount_path = ActionCable::INTERNAL[:default_mount_path] config.eager_load_namespaces << ActionCable @@ -31,8 +31,50 @@ module ActionCable self.cable = Rails.application.config_for(config_path).with_indifferent_access end + if 'ApplicationCable::Connection'.safe_constantize + self.connection_class = ApplicationCable::Connection + end + + self.channel_paths = Rails.application.paths['app/channels'].existent + options.each { |k,v| send("#{k}=", v) } end end + + initializer "action_cable.routes" do + config.after_initialize do |app| + config = app.config + unless config.action_cable.mount_path.nil? + app.routes.prepend do + mount ActionCable.server => config.action_cable.mount_path, internal: true + end + end + end + end + + initializer "action_cable.set_work_hooks" do |app| + ActiveSupport.on_load(:action_cable) do + ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner| + app.executor.wrap do + # If we took a while to get the lock, we may have been halted + # in the meantime. As we haven't started doing any real work + # yet, we should pretend that we never made it off the queue. + unless stopping? + inner.call + end + end + end + + wrap = lambda do |_, inner| + app.executor.wrap(&inner) + end + ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap + ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap + + app.reloader.before_class_unload do + ActionCable.server.restart + end + end + end end end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index c652fb91ae..67adeefaff 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -8,7 +8,7 @@ module ActionCable MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta1.1" + PRE = "beta3" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index b82751468a..2081a37db6 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -1,28 +1,39 @@ module ActionCable module Helpers module ActionCableHelper - # Returns an "action-cable-url" meta tag with the value of the url specified in your - # configuration. Ensure this is above your javascript tag: + # Returns an "action-cable-url" meta tag with the value of the URL specified in your + # configuration. Ensure this is above your JavaScript tag: # # <head> # <%= action_cable_meta_tag %> # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> # </head> # - # This is then used by ActionCable to determine the url of your websocket server. + # This is then used by Action Cable to determine the URL of your WebSocket server. # Your CoffeeScript can then connect to the server without needing to specify the - # url directly: + # URL directly: # # #= require cable # @App = {} # App.cable = Cable.createConsumer() # - # Make sure to specify the correct server location in each of your environments - # config file: + # Make sure to specify the correct server location in each of your environment + # config files: + # + # config.action_cable.mount_path = "/cable123" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="/cable123" /> + # + # config.action_cable.url = "ws://actioncable.com" + # <%= action_cable_meta_tag %> would render: + # => <meta name="action-cable-url" content="ws://actioncable.com" /> # - # config.action_cable.url = "ws://example.com:28080" def action_cable_meta_tag - tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url + tag "meta", name: "action-cable-url", content: ( + ActionCable.server.config.url || + ActionCable.server.config.mount_path || + raise("No Action Cable URL configured -- please configure this at config.action_cable.url") + ) end end end diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index dce637b3ca..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'action_cable/server' -require 'eventmachine' - -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") -end diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index aa2fc95d2f..aeef8abc72 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -1,6 +1,7 @@ module ActionCable - # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by - # searching the identifier declared on the connection. Example: + # If you need to disconnect a given connection, you can go through the + # RemoteConnections. You can find the connections you're looking for by + # searching for the identifier declared on the connection. For example: # # module ApplicationCable # class Connection < ActionCable::Connection::Base @@ -11,8 +12,9 @@ module ActionCable # # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # - # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses - # the internal channel that all these servers are subscribed to). + # This will disconnect all the connections established for + # <tt>User.find(1)</tt>, across all servers running on all machines, because + # it uses the internal channel that all of these servers are subscribed to. class RemoteConnections attr_reader :server @@ -25,7 +27,7 @@ module ActionCable end private - # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). + # Represents a single remote connection found via <tt>ActionCable.server.remote_connections.where(*)</tt>. # Exists for the solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..778f5ffeed 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,9 +1,11 @@ +require 'monitor' + module ActionCable module Server - # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but - # also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers. + # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the Rack process that starts the Action Cable server, but + # is also used by the user to reach the RemoteConnections object, which is used for finding and disconnecting connections across all servers. # - # Also, this is the server instance used for broadcasting. See Broadcasting for details. + # Also, this is the server instance used for broadcasting. See Broadcasting for more information. class Base include ActionCable::Server::Broadcasting include ActionCable::Server::Connections @@ -13,10 +15,14 @@ module ActionCable def self.logger; config.logger; end delegate :logger, to: :config + attr_reader :mutex + def initialize + @mutex = Monitor.new + @remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil end - # Called by rack to setup the server. + # Called by Rack to setup the server. def call(env) setup_heartbeat_timer config.connection_class.new(self, env).process @@ -27,30 +33,46 @@ module ActionCable remote_connections.where(identifiers).disconnect end + def restart + connections.each(&:close) + + @mutex.synchronize do + worker_pool.halt if @worker_pool + + @worker_pool = nil + end + end + # Gateway to RemoteConnections. See that class for details. def remote_connections - @remote_connections ||= RemoteConnections.new(self) + @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } + end + + def event_loop + @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } end # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool - @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) + @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end - # Requires and returns a hash of all the channel class constants keyed by name. + # Requires and returns a hash of all of the channel class constants, which are keyed by name. def channel_classes - @channel_classes ||= begin - config.channel_paths.each { |channel_path| require channel_path } - config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + @channel_classes || @mutex.synchronize do + @channel_classes ||= begin + config.channel_paths.each { |channel_path| require channel_path } + config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + end end end # Adapter used for all streams/broadcasting. def pubsub - @pubsub ||= config.pubsub_adapter.new(self) + @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end - # All the identifiers applied to the connection class associated with this server. + # All of the identifiers applied to the connection class associated with this server. def connection_identifiers config.connection_class.identifiers end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 4a26ed9269..98025f27f2 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,32 +1,32 @@ module ActionCable module Server - # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these + # Broadcasting is how other parts of your application can send messages to a channel's subscribers. As explained in Channel, most of the time, these # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel - # def subscribed - # stream_from "web_notifications_#{current_user.id}" - # end - # end + # def subscribed + # stream_from "web_notifications_#{current_user.id}" + # end + # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob - # ActionCable.server.broadcast \ - # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' } + # # Somewhere in your app this is called, perhaps from a NewCommentJob: + # ActionCable.server.broadcast \ + # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side coffescript, which assumes you've already requested the right to send web notifications - # App.cable.subscriptions.create "WebNotificationsChannel", - # received: (data) -> - # new Notification data['title'], body: data['body'] + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications: + # App.cable.subscriptions.create "WebNotificationsChannel", + # received: (data) -> + # new Notification data['title'], body: data['body'] module Broadcasting - # Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded. + # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. def broadcast(broadcasting, message) broadcaster_for(broadcasting).broadcast(message) end - # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that + # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. def broadcaster_for(broadcasting) - Broadcaster.new(self, broadcasting) + Broadcaster.new(self, String(broadcasting)) end private diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 9a248933c4..5fe71caed2 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -1,31 +1,24 @@ module ActionCable module Server - # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points + # An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak Action Cable configuration # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :connection_class, :worker_pool_size - attr_accessor :channel_load_paths + attr_accessor :use_faye, :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :cable, :url + attr_accessor :cable, :url, :mount_path + + attr_accessor :channel_paths # :nodoc: def initialize @log_tags = [] - @connection_class = ApplicationCable::Connection - @worker_pool_size = 100 - - @channel_load_paths = [Rails.root.join('app/channels')] + @connection_class = ActionCable::Connection::Base + @worker_pool_size = 100 @disable_request_forgery_protection = false end - def channel_paths - @channel_paths ||= channel_load_paths.flat_map do |path| - Dir["#{path}/**/*_channel.rb"] - end - end - def channel_class_names @channel_class_names ||= channel_paths.collect do |channel_path| Pathname.new(channel_path).basename.to_s.split('.').first.camelize @@ -50,6 +43,22 @@ module ActionCable adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize end + + def event_loop_class + if use_faye + ActionCable::Connection::FayeEventLoop + else + ActionCable::Connection::StreamEventLoop + end + end + + def client_socket_class + if use_faye + ActionCable::Connection::FayeClientSocket + else + ActionCable::Connection::ClientSocket + end + end end end end diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..5e61b4e335 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -1,9 +1,8 @@ module ActionCable module Server - # Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so - # you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that. - # As such, this is primarily for internal use. - module Connections + # Collection class for all the connections that have been established on this specific server. Remember, usually you'll run many Action Cable servers, so + # you can't use this collection as a full list of all of the connections established against your application. Instead, use RemoteConnections for that. + module Connections # :nodoc: BEAT_INTERVAL = 3 def connections @@ -19,13 +18,11 @@ module ActionCable end # WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you - # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically + # then can't rely on being able to communicate with the connection. To solve this, a 3 second heartbeat runs on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end + @heartbeat_timer ||= event_loop.timer(BEAT_INTERVAL) do + event_loop.post { connections.map(&:beat) } end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 3b6c6d44a1..49cbaec0c0 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -4,8 +4,8 @@ require 'concurrent' module ActionCable module Server - # Worker used by Server.send_async to do connection work in threads. Only for internal use. - class Worker + # Worker used by Server.send_async to do connection work in threads. + class Worker # :nodoc: include ActiveSupport::Callbacks thread_mattr_accessor :connection @@ -20,6 +20,26 @@ module ActionCable ) end + # Stop processing work: any work that has not already started + # running will be discarded from the queue + def halt + @pool.kill + end + + def stopping? + @pool.shuttingdown? + end + + def work(connection) + self.connection = connection + + run_callbacks :work do + yield + end + ensure + self.connection = nil + end + def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -27,19 +47,15 @@ module ActionCable end def invoke(receiver, method, *args) - begin - self.connection = receiver - - run_callbacks :work do + work(receiver) do + begin receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - ensure - self.connection = nil + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end end end @@ -50,14 +66,8 @@ module ActionCable end def run_periodic_timer(channel, callback) - begin - self.connection = channel.connection - - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - ensure - self.connection = nil + work(channel.connection) do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end end diff --git a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb index ecece4e270..c1e4aa8103 100644 --- a/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb +++ b/actioncable/lib/action_cable/server/worker/active_record_connection_management.rb @@ -1,7 +1,6 @@ module ActionCable module Server class Worker - # Clear active connections between units of work so the long-running channel or connection processes do not hoard connections. module ActiveRecordConnectionManagement extend ActiveSupport::Concern @@ -13,10 +12,8 @@ module ActionCable def with_database_connections connection.logger.tag(ActiveRecord::Base.logger) { yield } - ensure - ActiveRecord::Base.clear_active_connections! end end end end -end
\ No newline at end of file +end diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..10b3ac8cd8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -4,17 +4,22 @@ module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private - def subscriber_map - @subscriber_map ||= AsyncSubscriberMap.new + def new_subscriber_map + AsyncSubscriberMap.new(server.event_loop) end class AsyncSubscriberMap < SubscriberMap + def initialize(event_loop) + @event_loop = event_loop + super() + end + def add_subscriber(*) - ::EM.next_tick { super } + @event_loop.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..256876cf30 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,75 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| + redis.on(:reconnect_failed) do + @logger.error "[ActionCable] Redis reconnect failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 4a2a8d23a2..81357faead 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -1,6 +1,11 @@ module ActionCable module SubscriptionAdapter class Inline < Base # :nodoc: + def initialize(*) + super + @subscriber_map = nil + end + def broadcast(channel, payload) subscriber_map.broadcast(channel, payload) end @@ -19,7 +24,11 @@ module ActionCable private def subscriber_map - @subscriber_map ||= SubscriberMap.new + @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map } + end + + def new_subscriber_map + SubscriberMap.new end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..66c7852f6e 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,14 +42,15 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @queue = Queue.new @thread = Thread.new do @@ -63,7 +69,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + @event_loop.post(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +99,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + @event_loop.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,41 +1,171 @@ -gem 'em-hiredis', '~> 0.3.0' +require 'thread' + gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + + def initialize(*) + super + @listener = nil + @redis_connection_for_broadcasts = nil + end + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + redis_connection end private - def redis_connection_for_subscriptions - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= redis_connection + end + end + + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + + class Listener < SubscriberMap + def initialize(adapter, event_loop) + super() + + @adapter = adapter + @event_loop = event_loop + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + @event_loop.post(&next_callback) if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + @event_loop.post { super } + end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/lib/assets/javascripts/action_cable.coffee.erb deleted file mode 100644 index 7daea4ebcd..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require action_cable/consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee deleted file mode 100644 index fcd8d0fb6c..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require action_cable/connection -#= require action_cable/connection_monitor -#= require action_cable/subscriptions -#= require action_cable/subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index 27a934c689..6249553c22 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -3,7 +3,7 @@ Description: Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascript/cable.coffee after generating any channels. + Note: Turn on the cable connection in app/assets/javascript/cable.js after generating any channels. Example: ======== diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c5d398810a..d89ab45816 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -15,11 +15,28 @@ module Rails if options[:assets] template "assets/channel.coffee", File.join('app/assets/javascripts/channels', class_path, "#{file_name}.coffee") end + + generate_application_cable_files end protected def file_name - @_file_name ||= super.gsub(/\_channel/i, '') + @_file_name ||= super.gsub(/_channel/i, '') + end + + # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. + def generate_application_cable_files + return if self.behavior != :invoke + + files = [ + 'application_cable/channel.rb', + 'application_cable/connection.rb' + ] + + files.each do |name| + path = File.join('app/channels/', name) + template(name, path) if !File.exist?(path) + end end end end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb new file mode 100644 index 0000000000..d56fa30f4d --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/channel.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Channel < ActionCable::Channel::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb new file mode 100644 index 0000000000..b4f41389ad --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/application_cable/connection.rb @@ -0,0 +1,5 @@ +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. +module ApplicationCable + class Connection < ActionCable::Connection::Base + end +end diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed |