From 9ea7aa84d16d99fd32ed1877e3fd6631a41e7042 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Wed, 27 Jan 2016 14:33:15 +0100 Subject: Revert "Eliminate the EventMachine dependency" --- .../lib/action_cable/channel/periodic_timers.rb | 4 +- actioncable/lib/action_cable/channel/streams.rb | 2 +- actioncable/lib/action_cable/connection.rb | 5 +- actioncable/lib/action_cable/connection/base.rb | 32 ++--- .../lib/action_cable/connection/client_socket.rb | 152 --------------------- .../action_cable/connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/connection/stream.rb | 59 -------- .../action_cable/connection/stream_event_loop.rb | 68 --------- .../lib/action_cable/connection/web_socket.rb | 22 ++- actioncable/lib/action_cable/process/logging.rb | 7 + actioncable/lib/action_cable/server.rb | 4 + actioncable/lib/action_cable/server/base.rb | 4 - actioncable/lib/action_cable/server/connections.rb | 8 +- .../lib/action_cable/subscription_adapter/async.rb | 4 +- .../subscription_adapter/postgresql.rb | 4 +- .../lib/action_cable/subscription_adapter/redis.rb | 16 --- 16 files changed, 46 insertions(+), 349 deletions(-) delete mode 100644 actioncable/lib/action_cable/connection/client_socket.rb delete mode 100644 actioncable/lib/action_cable/connection/stream.rb delete mode 100644 actioncable/lib/action_cable/connection/stream_event_loop.rb create mode 100644 actioncable/lib/action_cable/process/logging.rb (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..7f0fb37afc 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.each { |timer| timer.cancel } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e387..e2876ef6fa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + EM.next_tick do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..b672e00682 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,15 +5,12 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base - autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :Stream - autoload :StreamEventLoop + autoload :WebSocket autoload :Subscriptions autoload :TaggedLoggerProxy - autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a4..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,6 +70,10 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + respond_to_successful_request else respond_to_invalid_request @@ -117,22 +121,6 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end - def on_open # :nodoc: - send_async :handle_open - end - - def on_message(message) # :nodoc: - message_buffer.append message - end - - def on_error(message) # :nodoc: - # ignore - end - - def on_close # :nodoc: - send_async :handle_close - end - protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -151,7 +139,7 @@ module ActionCable attr_reader :message_buffer private - def handle_open + def on_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -162,7 +150,11 @@ module ActionCable respond_to_invalid_request end - def handle_close + def on_message(message) + message_buffer.append message + end + + def on_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb deleted file mode 100644 index 62dd753646..0000000000 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ /dev/null @@ -1,152 +0,0 @@ -require 'websocket/driver' - -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class ClientSocket # :nodoc: - def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' - "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" - end - - def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' - - return false - end - - CONNECTING = 0 - OPEN = 1 - CLOSING = 2 - CLOSED = 3 - - attr_reader :env, :url - - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop - - @url = ClientSocket.determine_url(@env) - - @driver = @driver_started = nil - - @ready_state = CONNECTING - - # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) - - @driver.on(:open) { |e| open } - @driver.on(:message) { |e| receive_message(e.data) } - @driver.on(:close) { |e| begin_close(e.reason, e.code) } - @driver.on(:error) { |e| emit_error(e.message) } - - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) - - if callback = @env['async.callback'] - callback.call([101, {}, @stream]) - end - end - - def start_driver - return if @driver.nil? || @driver_started - @driver_started = true - @driver.start - end - - def rack_response - start_driver - [ -1, {}, [] ] - end - - def write(data) - @stream.write(data) - end - - def transmit(message) - return false if @ready_state > OPEN - case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) - else false - end - end - - def close(code = nil, reason = nil) - code ||= 1000 - reason ||= '' - - unless code == 1000 or (code >= 3000 and code <= 4999) - raise ArgumentError, "Failed to execute 'close' on WebSocket: " + - "The code must be either 1000, or between 3000 and 4999. " + - "#{code} is neither." - end - - @ready_state = CLOSING unless @ready_state == CLOSED - @driver.close(reason, code) - end - - def parse(data) - @driver.parse(data) - end - - def client_gone - finalize_close - end - - def alive? - @ready_state == OPEN - end - - private - def open - return unless @ready_state == CONNECTING - @ready_state = OPEN - - @event_target.on_open - end - - def receive_message(data) - return unless @ready_state == OPEN - - @event_target.on_message(data) - end - - def emit_error(message) - return if @ready_state >= CLOSING - - @event_target.on_error(message) - end - - def begin_close(reason, code) - return if @ready_state == CLOSED - @ready_state = CLOSING - @close_params = [reason, code] - - if @stream - @stream.shutdown - else - finalize_close - end - end - - def finalize_close - return if @ready_state == CLOSED - @ready_state = CLOSED - - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..54ed7672d2 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb deleted file mode 100644 index ace250cd16..0000000000 --- a/actioncable/lib/action_cable/connection/stream.rb +++ /dev/null @@ -1,59 +0,0 @@ -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class Stream - def initialize(event_loop, socket) - @event_loop = event_loop - @socket_object = socket - @stream_send = socket.env['stream.send'] - - @rack_hijack_io = nil - - hijack_rack_socket - end - - def each(&callback) - @stream_send ||= callback - end - - def close - shutdown - @socket_object.client_gone - end - - def shutdown - clean_rack_hijack - end - - def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send - rescue EOFError - @socket_object.client_gone - end - - def receive(data) - @socket_object.parse(data) - end - - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] - - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] - - @event_loop.attach(@rack_hijack_io, self) - end - - def clean_rack_hijack - return unless @rack_hijack_io - @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io = nil - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb deleted file mode 100644 index f773814973..0000000000 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ /dev/null @@ -1,68 +0,0 @@ -require 'nio' - -module ActionCable - module Connection - class StreamEventLoop - def initialize - @nio = NIO::Selector.new - @map = {} - @stopping = false - @todo = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - run - end - end - - def attach(io, stream) - @todo << lambda do - @map[io] = stream - @nio.register(io, :r) - end - @nio.wakeup - end - - def detach(io, stream) - @todo << lambda do - @nio.deregister(io) - @map.delete io - end - @nio.wakeup - end - - def stop - @stopping = true - @nio.wakeup - end - - def run - loop do - if @stopping - @nio.close - break - end - - until @todo.empty? - @todo.pop(true).call - end - - if monitors = @nio.select - monitors.each do |monitor| - io = monitor.io - stream = @map[io] - - begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next - rescue EOFError - stream.close - end - end - end - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..670d5690ae 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,13 @@ -require 'websocket/driver' +require 'faye/websocket' module ActionCable module Connection - # Wrap the real socket to minimize the externally-presented API + # Decorate the Faye::WebSocket with helpers we need. class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil end def possible? @@ -13,19 +15,11 @@ module ActionCable end def alive? - websocket && websocket.alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end def transmit(data) - websocket.transmit data - end - - def close - websocket.close - end - - def rack_response - websocket.rack_response + websocket.send data end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb new file mode 100644 index 0000000000..dce637b3ca --- /dev/null +++ b/actioncable/lib/action_cable/process/logging.rb @@ -0,0 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' + +EM.error_handler do |e| + puts "Error raised inside the event loop: #{e.message}" + puts e.backtrace.join("\n") +end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3..a2a89d5f1e 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,3 +1,7 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208c..3385a4c9f3 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,10 +32,6 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new - end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..47dcea8c20 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,9 +22,11 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..85d4892e4c 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..78f8aeb599 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + ::EM.next_tick(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..3b86354621 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,18 +1,11 @@ -require 'thread' - gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -34,7 +27,6 @@ module ActionCable private def redis_connection_for_subscriptions - ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -45,14 +37,6 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end - - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end end end end -- cgit v1.2.3 From 74497eabd52f2f9f8c383808b11286283046c2b2 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 15:25:31 +1030 Subject: Revert "Revert "Eliminate the EventMachine dependency"" --- .../lib/action_cable/channel/periodic_timers.rb | 4 +- actioncable/lib/action_cable/channel/streams.rb | 2 +- actioncable/lib/action_cable/connection.rb | 5 +- actioncable/lib/action_cable/connection/base.rb | 32 +++-- .../lib/action_cable/connection/client_socket.rb | 152 +++++++++++++++++++++ .../action_cable/connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/connection/stream.rb | 59 ++++++++ .../action_cable/connection/stream_event_loop.rb | 68 +++++++++ .../lib/action_cable/connection/web_socket.rb | 22 +-- actioncable/lib/action_cable/process/logging.rb | 7 - actioncable/lib/action_cable/server.rb | 4 - actioncable/lib/action_cable/server/base.rb | 4 + actioncable/lib/action_cable/server/connections.rb | 8 +- .../lib/action_cable/subscription_adapter/async.rb | 4 +- .../subscription_adapter/postgresql.rb | 4 +- .../lib/action_cable/subscription_adapter/redis.rb | 16 +++ 16 files changed, 349 insertions(+), 46 deletions(-) create mode 100644 actioncable/lib/action_cable/connection/client_socket.rb create mode 100644 actioncable/lib/action_cable/connection/stream.rb create mode 100644 actioncable/lib/action_cable/connection/stream_event_loop.rb delete mode 100644 actioncable/lib/action_cable/process/logging.rb (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..56597d02d7 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..a26373e387 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick do + Concurrent.global_io_executor.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,15 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..0016d1a1a4 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :stream_event_loop, :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,10 +70,6 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request @@ -121,6 +117,22 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close # :nodoc: + send_async :handle_close + end + protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -139,7 +151,7 @@ module ActionCable attr_reader :message_buffer private - def on_open + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -150,11 +162,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..62dd753646 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,152 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + @stream_event_loop = stream_event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + end + + def start_driver + return if @driver.nil? || @driver_started + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + if @stream + @stream.shutdown + else + finalize_close + end + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + reason = @close_params ? @close_params[0] : '' + code = @close_params ? @close_params[1] : 1006 + + @event_target.on_close(code, reason) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..27826792b3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..ace250cd16 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,59 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + + hijack_rack_socket + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + private + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..f773814973 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,68 @@ +require 'nio' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = NIO::Selector.new + @map = {} + @stopping = false + @todo = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + run + end + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + @nio.wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister(io) + @map.delete io + end + @nio.wakeup + end + + def stop + @stopping = true + @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + if monitors = @nio.select + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue EOFError + stream.close + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..5e89fb9b72 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, stream_event_loop) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil end def possible? @@ -15,11 +13,19 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index dce637b3ca..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'action_cable/server' -require 'eventmachine' - -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") -end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..b00abd208c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,6 +32,10 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + end + # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..8671dd5ebd 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,11 +22,9 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end - end + @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do + Concurrent.global_io_executor.post { connections.map(&:beat) } + end.tap(&:execute) end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..c88b03947a 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..3ce1bbed68 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + Concurrent.global_io_executor << callback if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..a035e3988d 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,11 +1,18 @@ +require 'thread' + gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + @@mutex = Mutex.new + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -27,6 +34,7 @@ module ActionCable private def redis_connection_for_subscriptions + ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -37,6 +45,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end end end end -- cgit v1.2.3 From a928aa3d3f1e6f8780acc22d69f4d5d1f5917926 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:55:05 +1030 Subject: Fix arguments to on_close --- actioncable/lib/action_cable/connection/base.rb | 2 +- actioncable/lib/action_cable/connection/client_socket.rb | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a4..b5f898436a 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -129,7 +129,7 @@ module ActionCable # ignore end - def on_close # :nodoc: + def on_close(reason, code) # :nodoc: send_async :handle_close end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 62dd753646..ef937d7c16 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -37,6 +37,7 @@ module ActionCable @url = ClientSocket.determine_url(@env) @driver = @driver_started = nil + @close_params = ['', 1006] @ready_state = CONNECTING @@ -142,10 +143,7 @@ module ActionCable return if @ready_state == CLOSED @ready_state = CLOSED - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) + @event_target.on_close(*@close_params) end end end -- cgit v1.2.3 From 16a6603956551703e3bbd06101c568a73bcdaa52 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:53:27 +1030 Subject: Synchronize the lazy setters in Server They're all at risk of races on the first requests. --- actioncable/lib/action_cable/server/base.rb | 23 +++++++++++++++------- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../action_cable/subscription_adapter/inline.rb | 11 ++++++++++- .../subscription_adapter/postgresql.rb | 7 ++++++- .../lib/action_cable/subscription_adapter/redis.rb | 17 ++++++++++++---- 5 files changed, 47 insertions(+), 15 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208c..fe48c112df 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,3 +1,5 @@ +require 'thread' + module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -13,7 +15,12 @@ module ActionCable def self.logger; config.logger; end delegate :logger, to: :config + attr_reader :mutex + def initialize + @mutex = Mutex.new + + @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil end # Called by rack to setup the server. @@ -29,29 +36,31 @@ module ActionCable # Gateway to RemoteConnections. See that class for details. def remote_connections - @remote_connections ||= RemoteConnections.new(self) + @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool - @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) + @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end # Requires and returns a hash of all the channel class constants keyed by name. def channel_classes - @channel_classes ||= begin - config.channel_paths.each { |channel_path| require channel_path } - config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + @channel_classes || @mutex.synchronize do + @channel_classes ||= begin + config.channel_paths.each { |channel_path| require channel_path } + config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + end end end # Adapter used for all streams/broadcasting. def pubsub - @pubsub ||= config.pubsub_adapter.new(self) + @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..cca6894289 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -4,8 +4,8 @@ module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private - def subscriber_map - @subscriber_map ||= AsyncSubscriberMap.new + def new_subscriber_map + AsyncSubscriberMap.new end class AsyncSubscriberMap < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 4a2a8d23a2..81357faead 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -1,6 +1,11 @@ module ActionCable module SubscriptionAdapter class Inline < Base # :nodoc: + def initialize(*) + super + @subscriber_map = nil + end + def broadcast(channel, payload) subscriber_map.broadcast(channel, payload) end @@ -19,7 +24,11 @@ module ActionCable private def subscriber_map - @subscriber_map ||= SubscriberMap.new + @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map } + end + + def new_subscriber_map + SubscriberMap.new end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..abaeb92e54 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,7 +42,7 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end class Listener < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..560b79df16 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -13,6 +13,11 @@ module ActionCable class Redis < Base # :nodoc: @@mutex = Mutex.new + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -35,15 +40,19 @@ module ActionCable private def redis_connection_for_subscriptions ensure_reactor_running - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end end def redis_connection_for_broadcasts - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end end def ensure_reactor_running -- cgit v1.2.3 From 786ed1b3ad8eeddb911211b67031016730ed55c8 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 18:46:14 +1030 Subject: Handle more IO errors (especially, ECONNRESET) Also, address the possibility of the listen thread dying and needing to be respawned. As a bonus, we now defer construction of the thread until we are first given something to monitor. --- .../action_cable/connection/stream_event_loop.rb | 68 +++++++++++++++------- 1 file changed, 47 insertions(+), 21 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index f773814973..e6335082d2 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -1,18 +1,17 @@ require 'nio' +require 'thread' module ActionCable module Connection class StreamEventLoop def initialize - @nio = NIO::Selector.new + @nio = @thread = nil @map = {} @stopping = false @todo = Queue.new - Thread.new do - Thread.current.abort_on_exception = true - run - end + @spawn_mutex = Mutex.new + spawn end def attach(io, stream) @@ -20,34 +19,53 @@ module ActionCable @map[io] = stream @nio.register(io, :r) end - @nio.wakeup + wakeup end def detach(io, stream) @todo << lambda do - @nio.deregister(io) + @nio.deregister io @map.delete io end - @nio.wakeup + wakeup end def stop @stopping = true - @nio.wakeup + wakeup if @nio end - def run - loop do - if @stopping - @nio.close - break - end + private + def spawn + return if @thread && @thread.status + + @spawn_mutex.synchronize do + return if @thread && @thread.status + + @nio ||= NIO::Selector.new + @thread = Thread.new { run } - until @todo.empty? - @todo.pop(true).call + return true end + end + + def wakeup + spawn || @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + next unless monitors = @nio.select - if monitors = @nio.select monitors.each do |monitor| io = monitor.io stream = @map[io] @@ -56,13 +74,21 @@ module ActionCable stream.receive io.read_nonblock(4096) rescue IO::WaitReadable next - rescue EOFError - stream.close + rescue + # We expect one of EOFError or Errno::ECONNRESET in + # normal operation (when the client goes away). But if + # anything else goes wrong, this is still the best way + # to handle it. + begin + stream.close + rescue + @nio.deregister io + @map.delete io + end end end end end - end end end end -- cgit v1.2.3 From 896950a605c509f19f3e8cbde11e23ca87036ca3 Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Sat, 30 Jan 2016 15:41:14 -0500 Subject: Add task to create precompiled action_cable.js and reorganize to accommodate --- .../lib/assets/javascripts/action_cable.coffee.erb | 23 ------ .../javascripts/action_cable/connection.coffee | 81 ---------------------- .../action_cable/connection_monitor.coffee | 79 --------------------- .../javascripts/action_cable/consumer.coffee | 25 ------- .../lib/assets/javascripts/action_cable/index.js | 1 + .../action_cable/source/connection.coffee | 81 ++++++++++++++++++++++ .../action_cable/source/connection_monitor.coffee | 79 +++++++++++++++++++++ .../action_cable/source/consumer.coffee | 25 +++++++ .../action_cable/source/index.coffee.erb | 23 ++++++ .../action_cable/source/subscription.coffee | 68 ++++++++++++++++++ .../action_cable/source/subscriptions.coffee | 64 +++++++++++++++++ .../javascripts/action_cable/subscription.coffee | 68 ------------------ .../javascripts/action_cable/subscriptions.coffee | 64 ----------------- 13 files changed, 341 insertions(+), 340 deletions(-) delete mode 100644 actioncable/lib/assets/javascripts/action_cable.coffee.erb delete mode 100644 actioncable/lib/assets/javascripts/action_cable/connection.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/consumer.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/index.js create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/subscription.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee (limited to 'actioncable/lib') diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/lib/assets/javascripts/action_cable.coffee.erb deleted file mode 100644 index 7daea4ebcd..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require action_cable/consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee deleted file mode 100644 index fcd8d0fb6c..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require action_cable/connection -#= require action_cable/connection_monitor -#= require action_cable/subscriptions -#= require action_cable/subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/index.js b/actioncable/lib/assets/javascripts/action_cable/index.js new file mode 100644 index 0000000000..6c69e42337 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/index.js @@ -0,0 +1 @@ +//= require ./dist/action_cable diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee new file mode 100644 index 0000000000..fbd7dbd35b --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee @@ -0,0 +1,81 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +{message_types} = ActionCable.INTERNAL + +class ActionCable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + switch type + when message_types.confirmation + @consumer.subscriptions.notify(identifier, "connected") + when message_types.rejection + @consumer.subscriptions.reject(identifier) + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee new file mode 100644 index 0000000000..99b9a1c6d5 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee @@ -0,0 +1,79 @@ +# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. +class ActionCable.ConnectionMonitor + @pollInterval: + min: 3 + max: 30 + + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: ActionCable.INTERNAL.identifiers.ping + + constructor: (@consumer) -> + @consumer.subscriptions.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) + + stop: -> + @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() + + connectionIsStale: -> + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee new file mode 100644 index 0000000000..717c0641a9 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee @@ -0,0 +1,25 @@ +#= require ./connection +#= require ./connection_monitor +#= require ./subscriptions +#= require ./subscription + +# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +# method. +# +# The following example shows how this can be setup: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Consumer + constructor: (@url) -> + @subscriptions = new ActionCable.Subscriptions this + @connection = new ActionCable.Connection this + @connectionMonitor = new ActionCable.ConnectionMonitor this + + send: (data) -> + @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb new file mode 100644 index 0000000000..f4615b7502 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb @@ -0,0 +1,23 @@ +#= require_self +#= require ./consumer + +@ActionCable = + INTERNAL: <%= ActionCable::INTERNAL.to_json %> + + createConsumer: (url = @getConfig("url")) -> + new ActionCable.Consumer @createWebSocketURL(url) + + getConfig: (name) -> + element = document.head.querySelector("meta[name='action-cable-#{name}']") + element?.getAttribute("content") + + createWebSocketURL: (url) -> + if url and not /^wss?:/i.test(url) + a = document.createElement("a") + a.href = url + # Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + a.href + else + url diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee new file mode 100644 index 0000000000..339d676933 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee @@ -0,0 +1,68 @@ +# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +# Channel instance on the server side. +# +# An example demonstrates the basic functionality: +# +# App.appearance = App.cable.subscriptions.create "AppearanceChannel", +# connected: -> +# # Called once the subscription has been successfully completed +# +# appear: -> +# @perform 'appear', appearing_on: @appearingOn() +# +# away: -> +# @perform 'away' +# +# appearingOn: -> +# $('main').data 'appearing-on' +# +# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +# +# This is how the server component would look: +# +# class AppearanceChannel < ApplicationActionCable::Channel +# def subscribed +# current_user.appear +# end +# +# def unsubscribed +# current_user.disappear +# end +# +# def appear(data) +# current_user.appear on: data['appearing_on'] +# end +# +# def away +# current_user.away +# end +# end +# +# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. +class ActionCable.Subscription + constructor: (@subscriptions, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @subscriptions.add(this) + @consumer = @subscriptions.consumer + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @subscriptions.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee new file mode 100644 index 0000000000..ae041ffa2b --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee @@ -0,0 +1,64 @@ +# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Subscriptions + constructor: (@consumer) -> + @subscriptions = [] + + create: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new ActionCable.Subscription this, params, mixin + + # Private + + add: (subscription) -> + @subscriptions.push(subscription) + @notify(subscription, "initialized") + @sendCommand(subscription, "subscribe") + + remove: (subscription) -> + @forget(subscription) + + unless @findAll(subscription.identifier).length + @sendCommand(subscription, "unsubscribe") + + reject: (identifier) -> + for subscription in @findAll(identifier) + @forget(subscription) + @notify(subscription, "rejected") + + forget: (subscription) -> + @subscriptions = (s for s in @subscriptions when s isnt subscription) + + findAll: (identifier) -> + s for s in @subscriptions when s.identifier is identifier + + reload: -> + for subscription in @subscriptions + @sendCommand(subscription, "subscribe") + + notifyAll: (callbackName, args...) -> + for subscription in @subscriptions + @notify(subscription, callbackName, args...) + + notify: (subscription, callbackName, args...) -> + if typeof subscription is "string" + subscriptions = @findAll(subscription) + else + subscriptions = [subscription] + + for subscription in subscriptions + subscription[callbackName]?(args...) + + sendCommand: (subscription, command) -> + {identifier} = subscription + if identifier is ActionCable.INTERNAL.identifiers.ping + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) -- cgit v1.2.3 From 09a706065952d58d515420b19a55df619eb7f53d Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Sat, 30 Jan 2016 20:39:22 -0500 Subject: Improvements and reorganization of assets --- .../lib/assets/javascripts/action_cable/index.js | 1 - .../action_cable/source/connection.coffee | 81 ---------------------- .../action_cable/source/connection_monitor.coffee | 79 --------------------- .../action_cable/source/consumer.coffee | 25 ------- .../action_cable/source/index.coffee.erb | 23 ------ .../action_cable/source/subscription.coffee | 68 ------------------ .../action_cable/source/subscriptions.coffee | 64 ----------------- 7 files changed, 341 deletions(-) delete mode 100644 actioncable/lib/assets/javascripts/action_cable/index.js delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee (limited to 'actioncable/lib') diff --git a/actioncable/lib/assets/javascripts/action_cable/index.js b/actioncable/lib/assets/javascripts/action_cable/index.js deleted file mode 100644 index 6c69e42337..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/index.js +++ /dev/null @@ -1 +0,0 @@ -//= require ./dist/action_cable diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee deleted file mode 100644 index 717c0641a9..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require ./connection -#= require ./connection_monitor -#= require ./subscriptions -#= require ./subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb deleted file mode 100644 index f4615b7502..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require ./consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) -- cgit v1.2.3 From e77368637e17e6a33db2713f651e85a09456c645 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 01:30:00 +1030 Subject: Switch the default redis adapter to a single-stream model This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with. --- .../subscription_adapter/evented_redis.rb | 67 +++++++++ .../lib/action_cable/subscription_adapter/redis.rb | 156 +++++++++++++++++---- 2 files changed, 193 insertions(+), 30 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/evented_redis.rb (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..d697548cbd --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,67 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 560b79df16..7076383efe 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,52 +1,40 @@ require 'thread' -gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def initialize(*) super - @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + @listener = nil + @redis_connection_for_broadcasts = nil end def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + ::Redis.new(@server.config.cable) end private - def redis_connection_for_subscriptions - ensure_reactor_running - @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end def redis_connection_for_broadcasts @@ -55,12 +43,120 @@ module ActionCable end end - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + class Listener < SubscriberMap + def initialize(adapter) + super() + + @adapter = adapter + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + Concurrent.global_io_executor << next_callback if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + Concurrent.global_io_executor.post { super } end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end -- cgit v1.2.3 From 49f6ce63f33b7817bcbd0cdf5f8881b63f40d9c9 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Mon, 1 Feb 2016 14:27:38 -0700 Subject: Preparing for Rails 5.0.0.beta2 --- actioncable/lib/action_cable/gem_version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index c652fb91ae..a71603e61a 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -8,7 +8,7 @@ module ActionCable MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta1.1" + PRE = "beta2" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end -- cgit v1.2.3 From 830543738507f49444956b3b6ae897f4638b2523 Mon Sep 17 00:00:00 2001 From: Nick Quaranto Date: Tue, 2 Feb 2016 11:16:41 -0500 Subject: [ci skip] Several ActionCable documentation updates: * Properly indent code sample in ActionCable::Channel::Streams * Add a doc comment for #stop_all_streams * Reformat + add blocks around code references in ActionCable::Base docs * Clarify and a little better grammar on ActionCable::RemoteConnections * Correct indentation and clean up ActionCable::Server::Broadcasting code sample --- actioncable/lib/action_cable/channel/base.rb | 33 ++++++++++++++-------- actioncable/lib/action_cable/channel/streams.rb | 26 +++++++++-------- actioncable/lib/action_cable/remote_connections.rb | 12 ++++---- .../lib/action_cable/server/broadcasting.rb | 22 +++++++-------- 4 files changed, 53 insertions(+), 40 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 88cdc1cab1..874ebe2e71 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,11 @@ module ActionCable # # == Action processing # - # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can - # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # Unlike subclasses of ActionController::Base, channels do not follow a REST + # constraint form for their actions. Instead, ActionCable operates through a + # remote-procedure call model. You can declare any public method on the + # channel (optionally taking a data argument), and this method is + # automatically exposed as callable to the client. # # Example: # @@ -60,18 +63,22 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away - # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then - # uses as part of its model call. #away does not, it's simply a trigger action. + # In this example, subscribed/unsubscribed are not callable methods, as they + # were already declared in ActionCable::Channel::Base, but #appear + # and #away are. #generate_connection_token is also not + # callable as it's a private method. You'll see that appear accepts a data + # parameter, which it then uses as part of its model call. #away + # does not, since it's simply a trigger action. # - # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. - # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # Also note that in this example, current_user is available because + # it was marked as an identifying attribute on the connection. All such + # identifiers will automatically create a delegation method of the same name + # on the channel instance. # # == Rejecting subscription requests # - # A channel can reject a subscription request in the #subscribed callback by invoking #reject! - # - # Example: + # A channel can reject a subscription request in the #subscribed callback by + # invoking the #reject method: # # class ChatChannel < ApplicationCable::Channel # def subscribed @@ -80,8 +87,10 @@ module ActionCable # end # end # - # In this example, the subscription will be rejected if the current_user does not have access to the chat room. - # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + # In this example, the subscription will be rejected if the + # current_user does not have access to the chat room. On the + # client-side, the Channel#rejected callback will get invoked when + # the server rejects the subscription request. class Base include Callbacks include PeriodicTimers diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e387..3158f30814 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -41,22 +41,23 @@ module ActionCable # Example below shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel - # def subscribed - # @room = Chat::Room[params[:room_number]] + # def subscribed + # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) + # stream_for @room, -> (encoded_message) do + # message = ActiveSupport::JSON.decode(encoded_message) # - # if message['originated_at'].present? - # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) + # if message['originated_at'].present? + # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # - # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing - # logger.info "Message took #{elapsed_time}s to arrive" - # end + # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing + # logger.info "Message took #{elapsed_time}s to arrive" + # end # - # transmit message - # end - # end + # transmit message + # end + # end + # end # # You can stop streaming from all broadcasts by calling #stop_all_streams. module Streams @@ -90,6 +91,7 @@ module ActionCable stream_from(broadcasting_for([ channel_name, model ]), callback) end + # Unsubscribes all streams associated with this channel from the pubsub queue. def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index aa2fc95d2f..7ec121308a 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -1,6 +1,7 @@ module ActionCable - # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by - # searching the identifier declared on the connection. Example: + # If you need to disconnect a given connection, you can go through the + # RemoteConnections. You can find the connections you're looking for by + # searching for the identifier declared on the connection. For example: # # module ApplicationCable # class Connection < ActionCable::Connection::Base @@ -11,8 +12,9 @@ module ActionCable # # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # - # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses - # the internal channel that all these servers are subscribed to). + # This will disconnect all the connections established for + # User.find(1) across all servers running on all machines, because + # it uses the internal channel that all these servers are subscribed to. class RemoteConnections attr_reader :server @@ -25,7 +27,7 @@ module ActionCable end private - # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). + # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). # Exists for the solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 4a26ed9269..7e8aef45f4 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -4,19 +4,19 @@ module ActionCable # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel - # def subscribed - # stream_from "web_notifications_#{current_user.id}" - # end - # end + # def subscribed + # stream_from "web_notifications_#{current_user.id}" + # end + # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob - # ActionCable.server.broadcast \ - # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' } + # # Somewhere in your app this is called, perhaps from a NewCommentJob + # ActionCable.server.broadcast \ + # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side coffescript, which assumes you've already requested the right to send web notifications - # App.cable.subscriptions.create "WebNotificationsChannel", - # received: (data) -> - # new Notification data['title'], body: data['body'] + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications + # App.cable.subscriptions.create "WebNotificationsChannel", + # received: (data) -> + # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named broadcasting. It'll automatically be JSON encoded. def broadcast(broadcasting, message) -- cgit v1.2.3 From 5e5fd246d5852b1c49dfdb8e635fb2e2c6ae8e55 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 4 Feb 2016 12:10:35 +0100 Subject: Allow for non-standard redis connectors --- .../lib/action_cable/subscription_adapter/evented_redis.rb | 12 ++++++++++-- actioncable/lib/action_cable/subscription_adapter/redis.rb | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index d697548cbd..af04a58c70 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -13,6 +13,14 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new + # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil @@ -41,7 +49,7 @@ module ActionCable def redis_connection_for_subscriptions ensure_reactor_running @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." end @@ -51,7 +59,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 7076383efe..ba4934a264 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,6 +6,10 @@ require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @listener = nil @@ -39,7 +43,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end -- cgit v1.2.3 From cdb6f2eb9e9e60d0c6aa6ddcb854f74595fa5f06 Mon Sep 17 00:00:00 2001 From: "yuuji.yaginuma" Date: Fri, 5 Feb 2016 21:52:59 +0900 Subject: =?UTF-8?q?don=E2=80=99t=20explicitly=20mention=20EventMachine=20[?= =?UTF-8?q?ci=20skip]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow up to 6accef4e11b0c793e1c085536b5ed27f32b6a0c3 --- actioncable/lib/rails/generators/channel/templates/channel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable/lib') diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed -- cgit v1.2.3