diff options
Diffstat (limited to 'actioncable/lib')
24 files changed, 186 insertions, 193 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index d353716636..c2d3550acb 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -1,5 +1,5 @@ #-- -# Copyright (c) 2015-2016 Basecamp, LLC +# Copyright (c) 2015-2017 Basecamp, LLC # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..6739a62ba0 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -122,16 +122,16 @@ module ActionCable end end - protected + private # action_methods are cached and there is sometimes need to refresh # them. ::clear_action_methods! allows you to do that, so next time # you run action_methods, they will be recalculated. - def clear_action_methods! + def clear_action_methods! # :doc: @action_methods = nil end # Refresh the cached action_methods when a new action_method is added. - def method_added(name) + def method_added(name) # :doc: super clear_action_methods! end @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -177,22 +189,22 @@ module ActionCable end end - protected + private # Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams # you want this channel to be sending to the subscriber. - def subscribed + def subscribed # :doc: # Override in subclasses end # Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking # users as offline or the like. - def unsubscribed + def unsubscribed # :doc: # Override in subclasses end # Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with # the proper channel identifier marked as the recipient. - def transmit(data, via: nil) + def transmit(data, via: nil) # :doc: logger.info "#{self.class.name} transmitting #{data.inspect.truncate(300)}".tap { |m| m << " (via #{via})" if via } payload = { channel_class: self.class.name, data: data, via: via } @@ -201,27 +213,32 @@ module ActionCable end end - def defer_subscription_confirmation! - @defer_subscription_confirmation = true + def ensure_confirmation_sent # :doc: + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + + def defer_subscription_confirmation! # :doc: + @defer_subscription_confirmation_counter.increment end - def defer_subscription_confirmation? - @defer_subscription_confirmation + def defer_subscription_confirmation? # :doc: + @defer_subscription_confirmation_counter.value > 0 end - def subscription_confirmation_sent? + def subscription_confirmation_sent? # :doc: @subscription_confirmation_sent end - def reject + def reject # :doc: @reject_subscription = true end - def subscription_rejected? + def subscription_rejected? # :doc: @reject_subscription end - private def delegate_connection_identifiers connection.identifiers.each do |identifier| define_singleton_method(identifier) do @@ -230,18 +247,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/naming.rb b/actioncable/lib/action_cable/channel/naming.rb index b7e88bf73d..b565cb3cac 100644 --- a/actioncable/lib/action_cable/channel/naming.rb +++ b/actioncable/lib/action_cable/channel/naming.rb @@ -12,7 +12,7 @@ module ActionCable # Chats::AppearancesChannel.channel_name # => 'chats:appearances' # FooChats::BarAppearancesChannel.channel_name # => 'foo_chats:bar_appearances' def channel_name - @channel_name ||= name.sub(/Channel$/, "").gsub("::",":").underscore + @channel_name ||= name.sub(/Channel$/, "").gsub("::", ":").underscore end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 561750d713..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -69,8 +69,8 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end @@ -94,8 +94,8 @@ module ActionCable # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. # - # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. - # Defaults to `coder: nil` which does no decoding, passes raw messages. + # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. + # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 5f813cf8e0..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,8 +8,6 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel - autoload :FayeClientSocket - autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b0615b08a1..0a517a532d 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -22,13 +22,10 @@ module ActionCable # # Any cleanup work needed when the cable connection is cut. # end # - # protected + # private # def find_verified_user - # if current_user = User.find_by_identity cookies.signed[:identity_id] - # current_user - # else + # User.find_by_identity(cookies.signed[:identity_id]) || # reject_unauthorized_connection - # end # end # end # end @@ -57,7 +54,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -105,7 +102,7 @@ module ActionCable worker_pool.async_invoke(self, method, *arguments) end - # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. + # Return a basic hash of statistics for the connection keyed with <tt>identifier</tt>, <tt>started_at</tt>, <tt>subscriptions</tt>, and <tt>request_id</tt>. # This can be returned by a health check against the connection. def statistics { @@ -136,9 +133,15 @@ module ActionCable send_async :handle_close end + # TODO Change this to private once we've dropped Ruby 2.2 support. + # Workaround for Ruby 2.2 "private attribute?" warning. protected + attr_reader :websocket + attr_reader :message_buffer + + private # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. - def request + def request # :doc: @request ||= begin environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application ActionDispatch::Request.new(environment || env) @@ -146,14 +149,10 @@ module ActionCable end # The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks. - def cookies + def cookies # :doc: request.cookie_jar end - attr_reader :websocket - attr_reader :message_buffer - - private def encode(cable_message) @coder.encode cable_message end @@ -195,7 +194,10 @@ module ActionCable def allow_request_origin? return true if server.config.disable_request_forgery_protection - if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } + proto = Rack::Request.new(env).ssl? ? "https" : "http" + if server.config.allow_same_origin_as_host && env["HTTP_ORIGIN"] == "#{proto}://#{env['HTTP_HOST']}" + true + elsif Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env["HTTP_ORIGIN"] } true else logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}") diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb deleted file mode 100644 index 06e92c5d52..0000000000 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ /dev/null @@ -1,48 +0,0 @@ -require "faye/websocket" - -module ActionCable - module Connection - class FayeClientSocket - def initialize(env, event_target, stream_event_loop, protocols) - @env = env - @event_target = event_target - @protocols = protocols - - @faye = nil - end - - def alive? - @faye && @faye.ready_state == Faye::WebSocket::API::OPEN - end - - def transmit(data) - connect - @faye.send data - end - - def close - @faye && @faye.close - end - - def protocol - @faye && @faye.protocol - end - - def rack_response - connect - @faye.rack_response - end - - private - def connect - return if @faye - @faye = Faye::WebSocket.new(@env, @protocols) - - @faye.on(:open) { |event| @event_target.on_open } - @faye.on(:message) { |event| @event_target.on_message(event.data) } - @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } - @faye.on(:error) { |event| @event_target.on_error(event.message) } - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb deleted file mode 100644 index cfbe26ee6a..0000000000 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "thread" - -require "eventmachine" -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -module ActionCable - module Connection - class FayeEventLoop - @@mutex = Mutex.new - - def timer(interval, &block) - ensure_reactor_running - EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) - end - - def post(task = nil, &block) - task ||= block - - ensure_reactor_running - ::EM.next_tick(&task) - end - - private - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end - - class EMTimer - def initialize(inner) - @inner = inner - end - - def shutdown - @inner.cancel - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 6a80770cae..4ccd322644 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -28,6 +28,8 @@ module ActionCable receive_buffered_messages end + # TODO Change this to private once we've dropped Ruby 2.2 support. + # Workaround for Ruby 2.2 "private attribute?" warning. protected attr_reader :connection attr_reader :buffered_messages diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 5a2aace0ba..e620b93845 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -14,6 +14,9 @@ module ActionCable @rack_hijack_io = nil @write_lock = Mutex.new + + @write_head = nil + @write_buffer = Queue.new end def each(&callback) @@ -30,14 +33,62 @@ module ActionCable end def write(data) - @write_lock.synchronize do - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send + if @stream_send + return @stream_send.call(data) end + + if @write_lock.try_lock + begin + if @write_head.nil? && @write_buffer.empty? + written = @rack_hijack_io.write_nonblock(data, exception: false) + + case written + when :wait_writable + # proceed below + when data.bytesize + return data.bytesize + else + @write_head = data.byteslice(written, data.bytesize) + @event_loop.writes_pending @rack_hijack_io + + return data.bytesize + end + end + ensure + @write_lock.unlock + end + end + + @write_buffer << data + @event_loop.writes_pending @rack_hijack_io + + data.bytesize rescue EOFError, Errno::ECONNRESET @socket_object.client_gone end + def flush_write_buffer + @write_lock.synchronize do + loop do + if @write_head.nil? + return true if @write_buffer.empty? + @write_head = @write_buffer.pop + end + + written = @rack_hijack_io.write_nonblock(@write_head, exception: false) + case written + when :wait_writable + return false + when @write_head.bytesize + @write_head = nil + else + @write_head = @write_head.byteslice(written, @write_head.bytesize) + return false + end + end + end + end + def receive(data) @socket_object.parse(data) end @@ -55,7 +106,6 @@ module ActionCable def clean_rack_hijack return unless @rack_hijack_io @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io.close @rack_hijack_io = nil end end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index 106b948c45..2d1af0ff9f 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -5,7 +5,7 @@ module ActionCable module Connection class StreamEventLoop def initialize - @nio = @thread = nil + @nio = @executor = @thread = nil @map = {} @stopping = false @todo = Queue.new @@ -20,13 +20,14 @@ module ActionCable def post(task = nil, &block) task ||= block - Concurrent.global_io_executor << task + spawn + @executor << task end def attach(io, stream) @todo << lambda do - @map[io] = stream - @nio.register(io, :r) + @map[io] = @nio.register(io, :r) + @map[io].value = stream end wakeup end @@ -35,6 +36,16 @@ module ActionCable @todo << lambda do @nio.deregister io @map.delete io + io.close + end + wakeup + end + + def writes_pending(io) + @todo << lambda do + if monitor = @map[io] + monitor.interests = :rw + end end wakeup end @@ -52,6 +63,13 @@ module ActionCable return if @thread && @thread.status @nio ||= NIO::Selector.new + + @executor ||= Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: 10, + max_queue: 0, + ) + @thread = Thread.new { run } return true @@ -77,12 +95,25 @@ module ActionCable monitors.each do |monitor| io = monitor.io - stream = @map[io] + stream = monitor.value begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next + if monitor.writable? + if stream.flush_write_buffer + monitor.interests = :r + end + next unless monitor.readable? + end + + incoming = io.read_nonblock(4096, exception: false) + case incoming + when :wait_readable + next + when nil + stream.close + else + stream.receive incoming + end rescue # We expect one of EOFError or Errno::ECONNRESET in # normal operation (when the client goes away). But if diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..abf42c99d5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,10 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end @@ -57,6 +61,8 @@ module ActionCable subscriptions.each { |id, channel| remove_subscription(channel) } end + # TODO Change this to private once we've dropped Ruby 2.2 support. + # Workaround for Ruby 2.2 "private attribute?" warning. protected attr_reader :connection, :subscriptions diff --git a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb index 41afa9680a..aef549aa86 100644 --- a/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb +++ b/actioncable/lib/action_cable/connection/tagged_logger_proxy.rb @@ -31,8 +31,8 @@ module ActionCable end end - protected - def log(type, message) + private + def log(type, message) # :doc: tag(@logger) { @logger.send type, message } end end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 52d8daad4b..03eb6e2ea8 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? @@ -32,6 +32,8 @@ module ActionCable websocket.rack_response end + # TODO Change this to private once we've dropped Ruby 2.2 support. + # Workaround for Ruby 2.2 "private attribute?" warning. protected attr_reader :websocket end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 4c5c975cd8..e23527b84e 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -34,7 +34,7 @@ module ActionCable previous_connection_class = self.connection_class self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call } - options.each { |k,v| send("#{k}=", v) } + options.each { |k, v| send("#{k}=", v) } end end diff --git a/actioncable/lib/action_cable/helpers/action_cable_helper.rb b/actioncable/lib/action_cable/helpers/action_cable_helper.rb index 2081a37db6..f53be0bc31 100644 --- a/actioncable/lib/action_cable/helpers/action_cable_helper.rb +++ b/actioncable/lib/action_cable/helpers/action_cable_helper.rb @@ -6,7 +6,7 @@ module ActionCable # # <head> # <%= action_cable_meta_tag %> - # <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %> + # <%= javascript_include_tag 'application', 'data-turbolinks-track' => 'reload' %> # </head> # # This is then used by Action Cable to determine the URL of your WebSocket server. diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 720ba52d19..d2856bc6ae 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -54,7 +54,7 @@ module ActionCable def set_identifier_instance_vars(ids) raise InvalidIdentifiersError unless valid_identifiers?(ids) - ids.each { |k,v| instance_variable_set("@#{k}", v) } + ids.each { |k, v| instance_variable_set("@#{k}", v) } end def valid_identifiers?(ids) diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index c700297a8d..419eccd73c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -37,9 +37,13 @@ module ActionCable connections.each(&:close) @mutex.synchronize do - worker_pool.halt if @worker_pool - + # Shutdown the worker pool + @worker_pool.halt if @worker_pool @worker_pool = nil + + # Shutdown the pub/sub adapter + @pubsub.shutdown if @pubsub + @pubsub = nil end end @@ -49,12 +53,12 @@ module ActionCable end def event_loop - @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. # The worker pool is an executor service that's backed by a pool of threads working from a task queue. The thread pool size maxes out - # at 4 worker threads by default. Tune the size yourself with `config.action_cable.worker_pool_size`. + # at 4 worker threads by default. Tune the size yourself with <tt>config.action_cable.worker_pool_size</tt>. # # Using Active Record, Redis, etc within your channel actions means you'll get a separate connection from each thread in the worker pool. # Plan your deployment accordingly: 5 servers each running 5 Puma workers each running an 8-thread worker pool means at least 200 database diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 7153593d4c..17e0dee064 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,8 +4,8 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :use_faye, :connection_class, :worker_pool_size - attr_accessor :disable_request_forgery_protection, :allowed_request_origins + attr_accessor :connection_class, :worker_pool_size + attr_accessor :disable_request_forgery_protection, :allowed_request_origins, :allow_same_origin_as_host attr_accessor :cable, :url, :mount_path def initialize @@ -15,6 +15,7 @@ module ActionCable @worker_pool_size = 4 @disable_request_forgery_protection = false + @allow_same_origin_as_host = true end # Returns constant of subscription adapter specified in config/cable.yml. @@ -35,22 +36,6 @@ module ActionCable adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end - - def event_loop_class - if use_faye - ActionCable::Connection::FayeEventLoop - else - ActionCable::Connection::StreamEventLoop - end - end - - def client_socket_class - if use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - end end end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 7460472551..43639c27af 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -25,7 +25,7 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @executor.kill + @executor.shutdown end def stopping? diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index bcd46d2a0e..c3018c5281 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -68,10 +68,10 @@ module ActionCable end def ensure_reactor_running - return if EventMachine.reactor_running? + return if EventMachine.reactor_running? && EventMachine.reactor_thread @@mutex.synchronize do Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? && EventMachine.reactor_thread end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb index 4ec513e3ba..4cce86dcca 100644 --- a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb +++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb @@ -2,7 +2,7 @@ module ActionCable module SubscriptionAdapter class SubscriberMap def initialize - @subscribers = Hash.new { |h,k| h[k] = [] } + @subscribers = Hash.new { |h, k| h[k] = [] } @sync = Mutex.new end diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index 6249553c22..dd109fda80 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -3,7 +3,7 @@ Description: Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascript/cable.js after generating any channels. + Note: Turn on the cable connection in app/assets/javascripts/cable.js after generating any channels. Example: ======== @@ -11,4 +11,4 @@ Example: creates a Chat channel class and CoffeeScript asset: Channel: app/channels/chat_channel.rb - Assets: app/assets/javascript/channels/chat.coffee + Assets: app/assets/javascripts/channels/chat.coffee diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index 20d807c033..04b787c3a4 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -23,7 +23,7 @@ module Rails generate_application_cable_files end - protected + private def file_name @_file_name ||= super.gsub(/_channel/i, "") end |