diff options
Diffstat (limited to 'actioncable/lib/action_cable')
10 files changed, 33 insertions, 133 deletions
diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 2e589a2cfa..a866044f95 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -144,13 +144,14 @@ module ActionCable # When a channel is streaming via pubsub, we want to delay the confirmation # transmission until pubsub subscription is confirmed. - @defer_subscription_confirmation = false + # + # The counter starts at 1 because it's awaiting a call to #subscribe_to_channel + @defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1) @reject_subscription = nil @subscription_confirmation_sent = nil delegate_connection_identifiers - subscribe_to_channel end # Extract the action name from the passed data and process it via the channel. The process will ensure @@ -169,6 +170,17 @@ module ActionCable end end + # This method is called after subscription has been added to the connection + # and confirms or rejects the subscription. + def subscribe_to_channel + run_callbacks :subscribe do + subscribed + end + + reject_subscription if subscription_rejected? + ensure_confirmation_sent + end + # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. def unsubscribe_from_channel # :nodoc: @@ -201,12 +213,18 @@ module ActionCable end end + def ensure_confirmation_sent + return if subscription_rejected? + @defer_subscription_confirmation_counter.decrement + transmit_subscription_confirmation unless defer_subscription_confirmation? + end + def defer_subscription_confirmation! - @defer_subscription_confirmation = true + @defer_subscription_confirmation_counter.increment end def defer_subscription_confirmation? - @defer_subscription_confirmation + @defer_subscription_confirmation_counter.value > 0 end def subscription_confirmation_sent? @@ -230,18 +248,6 @@ module ActionCable end end - def subscribe_to_channel - run_callbacks :subscribe do - subscribed - end - - if subscription_rejected? - reject_subscription - else - transmit_subscription_confirmation unless defer_subscription_confirmation? - end - end - def extract_action(data) (data["action"].presence || :receive).to_sym end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 13deb62662..dbba333353 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -84,7 +84,7 @@ module ActionCable connection.server.event_loop.post do pubsub.subscribe(broadcasting, handler, lambda do - transmit_subscription_confirmation + ensure_confirmation_sent logger.info "#{self.class.name} is streaming from #{broadcasting}" end) end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 5f813cf8e0..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -8,8 +8,6 @@ module ActionCable autoload :ClientSocket autoload :Identification autoload :InternalChannel - autoload :FayeClientSocket - autoload :FayeEventLoop autoload :MessageBuffer autoload :Stream autoload :StreamEventLoop diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 4c7fcc1434..06f4f5edd3 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -57,7 +57,7 @@ module ActionCable @worker_pool = server.worker_pool @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop, server.config.client_socket_class) + @websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) diff --git a/actioncable/lib/action_cable/connection/faye_client_socket.rb b/actioncable/lib/action_cable/connection/faye_client_socket.rb deleted file mode 100644 index 06e92c5d52..0000000000 --- a/actioncable/lib/action_cable/connection/faye_client_socket.rb +++ /dev/null @@ -1,48 +0,0 @@ -require "faye/websocket" - -module ActionCable - module Connection - class FayeClientSocket - def initialize(env, event_target, stream_event_loop, protocols) - @env = env - @event_target = event_target - @protocols = protocols - - @faye = nil - end - - def alive? - @faye && @faye.ready_state == Faye::WebSocket::API::OPEN - end - - def transmit(data) - connect - @faye.send data - end - - def close - @faye && @faye.close - end - - def protocol - @faye && @faye.protocol - end - - def rack_response - connect - @faye.rack_response - end - - private - def connect - return if @faye - @faye = Faye::WebSocket.new(@env, @protocols) - - @faye.on(:open) { |event| @event_target.on_open } - @faye.on(:message) { |event| @event_target.on_message(event.data) } - @faye.on(:close) { |event| @event_target.on_close(event.reason, event.code) } - @faye.on(:error) { |event| @event_target.on_error(event.message) } - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/faye_event_loop.rb b/actioncable/lib/action_cable/connection/faye_event_loop.rb deleted file mode 100644 index cfbe26ee6a..0000000000 --- a/actioncable/lib/action_cable/connection/faye_event_loop.rb +++ /dev/null @@ -1,44 +0,0 @@ -require "thread" - -require "eventmachine" -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - -module ActionCable - module Connection - class FayeEventLoop - @@mutex = Mutex.new - - def timer(interval, &block) - ensure_reactor_running - EMTimer.new(::EM::PeriodicTimer.new(interval, &block)) - end - - def post(task = nil, &block) - task ||= block - - ensure_reactor_running - ::EM.next_tick(&task) - end - - private - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end - - class EMTimer - def initialize(inner) - @inner = inner - end - - def shutdown - @inner.cancel - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 9060183249..00511aead5 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -26,10 +26,14 @@ module ActionCable id_key = data["identifier"] id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + return if subscriptions.key?(id_key) + subscription_klass = id_options[:channel].safe_constantize if subscription_klass && ActionCable::Channel::Base >= subscription_klass - subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) + subscription = subscription_klass.new(connection, id_key, id_options) + subscriptions[id_key] = subscription + subscription.subscribe_to_channel else logger.error "Subscription class not found: #{id_options[:channel].inspect}" end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 52d8daad4b..382141b89f 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -4,8 +4,8 @@ module ActionCable module Connection # Wrap the real socket to minimize the externally-presented API class WebSocket - def initialize(env, event_target, event_loop, client_socket_class, protocols: ActionCable::INTERNAL[:protocols]) - @websocket = ::WebSocket::Driver.websocket?(env) ? client_socket_class.new(env, event_target, event_loop, protocols) : nil + def initialize(env, event_target, event_loop, protocols: ActionCable::INTERNAL[:protocols]) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, event_loop, protocols) : nil end def possible? diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index dd059a553b..67ada7cc2e 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -49,7 +49,7 @@ module ActionCable end def event_loop - @event_loop || @mutex.synchronize { @event_loop ||= config.event_loop_class.new } + @event_loop || @mutex.synchronize { @event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The worker pool is where we run connection callbacks and channel actions. We do as little as possible on the server's main thread. diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 7153593d4c..dc146f07b0 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -4,7 +4,7 @@ module ActionCable # in a Rails config initializer. class Configuration attr_accessor :logger, :log_tags - attr_accessor :use_faye, :connection_class, :worker_pool_size + attr_accessor :connection_class, :worker_pool_size attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :cable, :url, :mount_path @@ -35,22 +35,6 @@ module ActionCable adapter = "PostgreSQL" if adapter == "Postgresql" "ActionCable::SubscriptionAdapter::#{adapter}".constantize end - - def event_loop_class - if use_faye - ActionCable::Connection::FayeEventLoop - else - ActionCable::Connection::StreamEventLoop - end - end - - def client_socket_class - if use_faye - ActionCable::Connection::FayeClientSocket - else - ActionCable::Connection::ClientSocket - end - end end end end |