From 9ea7aa84d16d99fd32ed1877e3fd6631a41e7042 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Wed, 27 Jan 2016 14:33:15 +0100 Subject: Revert "Eliminate the EventMachine dependency" --- actioncable/actioncable.gemspec | 3 +- .../lib/action_cable/channel/periodic_timers.rb | 4 +- actioncable/lib/action_cable/channel/streams.rb | 2 +- actioncable/lib/action_cable/connection.rb | 5 +- actioncable/lib/action_cable/connection/base.rb | 32 ++--- .../lib/action_cable/connection/client_socket.rb | 152 --------------------- .../action_cable/connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/connection/stream.rb | 59 -------- .../action_cable/connection/stream_event_loop.rb | 68 --------- .../lib/action_cable/connection/web_socket.rb | 22 ++- actioncable/lib/action_cable/process/logging.rb | 7 + actioncable/lib/action_cable/server.rb | 4 + actioncable/lib/action_cable/server/base.rb | 4 - actioncable/lib/action_cable/server/connections.rb | 8 +- .../lib/action_cable/subscription_adapter/async.rb | 4 +- .../subscription_adapter/postgresql.rb | 4 +- .../lib/action_cable/subscription_adapter/redis.rb | 16 --- actioncable/test/channel/periodic_timers_test.rb | 2 +- actioncable/test/channel/stream_test.rb | 22 +-- actioncable/test/connection/base_test.rb | 19 ++- actioncable/test/connection/identifier_test.rb | 4 +- .../test/connection/multiple_identifiers_test.rb | 4 +- actioncable/test/stubs/test_server.rb | 3 +- actioncable/test/subscription_adapter/common.rb | 3 + actioncable/test/test_helper.rb | 26 +++- 25 files changed, 99 insertions(+), 382 deletions(-) delete mode 100644 actioncable/lib/action_cable/connection/client_socket.rb delete mode 100644 actioncable/lib/action_cable/connection/stream.rb delete mode 100644 actioncable/lib/action_cable/connection/stream_event_loop.rb create mode 100644 actioncable/lib/action_cable/process/logging.rb (limited to 'actioncable') diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 14f968f1ef..a36acc8f6f 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,7 +21,8 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'nio4r', '~> 1.2' + s.add_dependency 'eventmachine', '~> 1.0' + s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 56597d02d7..7f0fb37afc 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do + active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.shutdown } + active_periodic_timers.each { |timer| timer.cancel } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e387..e2876ef6fa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - Concurrent.global_io_executor.post do + EM.next_tick do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 902efb07e2..b672e00682 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,15 +5,12 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base - autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :Stream - autoload :StreamEventLoop + autoload :WebSocket autoload :Subscriptions autoload :TaggedLoggerProxy - autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a4..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :stream_event_loop, :worker_pool, :pubsub, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) + @websocket = ActionCable::Connection::WebSocket.new(env) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,6 +70,10 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? + websocket.on(:open) { |event| send_async :on_open } + websocket.on(:message) { |event| on_message event.data } + websocket.on(:close) { |event| send_async :on_close } + respond_to_successful_request else respond_to_invalid_request @@ -117,22 +121,6 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end - def on_open # :nodoc: - send_async :handle_open - end - - def on_message(message) # :nodoc: - message_buffer.append message - end - - def on_error(message) # :nodoc: - # ignore - end - - def on_close # :nodoc: - send_async :handle_close - end - protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -151,7 +139,7 @@ module ActionCable attr_reader :message_buffer private - def handle_open + def on_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -162,7 +150,11 @@ module ActionCable respond_to_invalid_request end - def handle_close + def on_message(message) + message_buffer.append message + end + + def on_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb deleted file mode 100644 index 62dd753646..0000000000 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ /dev/null @@ -1,152 +0,0 @@ -require 'websocket/driver' - -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class ClientSocket # :nodoc: - def self.determine_url(env) - scheme = secure_request?(env) ? 'wss:' : 'ws:' - "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" - end - - def self.secure_request?(env) - return true if env['HTTPS'] == 'on' - return true if env['HTTP_X_FORWARDED_SSL'] == 'on' - return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' - return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' - return true if env['rack.url_scheme'] == 'https' - - return false - end - - CONNECTING = 0 - OPEN = 1 - CLOSING = 2 - CLOSED = 3 - - attr_reader :env, :url - - def initialize(env, event_target, stream_event_loop) - @env = env - @event_target = event_target - @stream_event_loop = stream_event_loop - - @url = ClientSocket.determine_url(@env) - - @driver = @driver_started = nil - - @ready_state = CONNECTING - - # The driver calls +env+, +url+, and +write+ - @driver = ::WebSocket::Driver.rack(self) - - @driver.on(:open) { |e| open } - @driver.on(:message) { |e| receive_message(e.data) } - @driver.on(:close) { |e| begin_close(e.reason, e.code) } - @driver.on(:error) { |e| emit_error(e.message) } - - @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) - - if callback = @env['async.callback'] - callback.call([101, {}, @stream]) - end - end - - def start_driver - return if @driver.nil? || @driver_started - @driver_started = true - @driver.start - end - - def rack_response - start_driver - [ -1, {}, [] ] - end - - def write(data) - @stream.write(data) - end - - def transmit(message) - return false if @ready_state > OPEN - case message - when Numeric then @driver.text(message.to_s) - when String then @driver.text(message) - when Array then @driver.binary(message) - else false - end - end - - def close(code = nil, reason = nil) - code ||= 1000 - reason ||= '' - - unless code == 1000 or (code >= 3000 and code <= 4999) - raise ArgumentError, "Failed to execute 'close' on WebSocket: " + - "The code must be either 1000, or between 3000 and 4999. " + - "#{code} is neither." - end - - @ready_state = CLOSING unless @ready_state == CLOSED - @driver.close(reason, code) - end - - def parse(data) - @driver.parse(data) - end - - def client_gone - finalize_close - end - - def alive? - @ready_state == OPEN - end - - private - def open - return unless @ready_state == CONNECTING - @ready_state = OPEN - - @event_target.on_open - end - - def receive_message(data) - return unless @ready_state == OPEN - - @event_target.on_message(data) - end - - def emit_error(message) - return if @ready_state >= CLOSING - - @event_target.on_error(message) - end - - def begin_close(reason, code) - return if @ready_state == CLOSED - @ready_state = CLOSING - @close_params = [reason, code] - - if @stream - @stream.shutdown - else - finalize_close - end - end - - def finalize_close - return if @ready_state == CLOSED - @ready_state = CLOSED - - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 27826792b3..54ed7672d2 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb deleted file mode 100644 index ace250cd16..0000000000 --- a/actioncable/lib/action_cable/connection/stream.rb +++ /dev/null @@ -1,59 +0,0 @@ -module ActionCable - module Connection - #-- - # This class is heavily based on faye-websocket-ruby - # - # Copyright (c) 2010-2015 James Coglan - class Stream - def initialize(event_loop, socket) - @event_loop = event_loop - @socket_object = socket - @stream_send = socket.env['stream.send'] - - @rack_hijack_io = nil - - hijack_rack_socket - end - - def each(&callback) - @stream_send ||= callback - end - - def close - shutdown - @socket_object.client_gone - end - - def shutdown - clean_rack_hijack - end - - def write(data) - return @rack_hijack_io.write(data) if @rack_hijack_io - return @stream_send.call(data) if @stream_send - rescue EOFError - @socket_object.client_gone - end - - def receive(data) - @socket_object.parse(data) - end - - private - def hijack_rack_socket - return unless @socket_object.env['rack.hijack'] - - @socket_object.env['rack.hijack'].call - @rack_hijack_io = @socket_object.env['rack.hijack_io'] - - @event_loop.attach(@rack_hijack_io, self) - end - - def clean_rack_hijack - return unless @rack_hijack_io - @event_loop.detach(@rack_hijack_io, self) - @rack_hijack_io = nil - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb deleted file mode 100644 index f773814973..0000000000 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ /dev/null @@ -1,68 +0,0 @@ -require 'nio' - -module ActionCable - module Connection - class StreamEventLoop - def initialize - @nio = NIO::Selector.new - @map = {} - @stopping = false - @todo = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - run - end - end - - def attach(io, stream) - @todo << lambda do - @map[io] = stream - @nio.register(io, :r) - end - @nio.wakeup - end - - def detach(io, stream) - @todo << lambda do - @nio.deregister(io) - @map.delete io - end - @nio.wakeup - end - - def stop - @stopping = true - @nio.wakeup - end - - def run - loop do - if @stopping - @nio.close - break - end - - until @todo.empty? - @todo.pop(true).call - end - - if monitors = @nio.select - monitors.each do |monitor| - io = monitor.io - stream = @map[io] - - begin - stream.receive io.read_nonblock(4096) - rescue IO::WaitReadable - next - rescue EOFError - stream.close - end - end - end - end - end - end - end -end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 5e89fb9b72..670d5690ae 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,11 +1,13 @@ -require 'websocket/driver' +require 'faye/websocket' module ActionCable module Connection - # Wrap the real socket to minimize the externally-presented API + # Decorate the Faye::WebSocket with helpers we need. class WebSocket - def initialize(env, event_target, stream_event_loop) - @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil + delegate :rack_response, :close, :on, to: :websocket + + def initialize(env) + @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil end def possible? @@ -13,19 +15,11 @@ module ActionCable end def alive? - websocket && websocket.alive? + websocket && websocket.ready_state == Faye::WebSocket::API::OPEN end def transmit(data) - websocket.transmit data - end - - def close - websocket.close - end - - def rack_response - websocket.rack_response + websocket.send data end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb new file mode 100644 index 0000000000..dce637b3ca --- /dev/null +++ b/actioncable/lib/action_cable/process/logging.rb @@ -0,0 +1,7 @@ +require 'action_cable/server' +require 'eventmachine' + +EM.error_handler do |e| + puts "Error raised inside the event loop: #{e.message}" + puts e.backtrace.join("\n") +end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index bd6a3826a3..a2a89d5f1e 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,3 +1,7 @@ +require 'eventmachine' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208c..3385a4c9f3 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,10 +32,6 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new - end - # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 8671dd5ebd..47dcea8c20 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,9 +22,11 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do - Concurrent.global_io_executor.post { connections.map(&:beat) } - end.tap(&:execute) + EM.next_tick do + @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do + EM.next_tick { connections.map(&:beat) } + end + end end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..85d4892e4c 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..78f8aeb599 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + ::EM.next_tick(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..3b86354621 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,18 +1,11 @@ -require 'thread' - gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -34,7 +27,6 @@ module ActionCable private def redis_connection_for_subscriptions - ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -45,14 +37,6 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end - - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 64f0247cd6..1590a12f09 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - Concurrent::TimerTask.expects(:new).times(2).returns(true) + EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 947efd96d4..3fa2b291b7 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,7 +31,9 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + EM.next_tick do + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + end channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -39,35 +41,39 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - run_in_eventmachine do + EM.run do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - wait_for_async + EM::Timer.new(0.1) do + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + EM.run_deferred_callbacks + EM.stop + end end end test "subscription confirmation should only be sent out once" do - run_in_eventmachine do + EM.run do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - wait_for_async + EM.run_deferred_callbacks expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size + EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index e2b017a9a1..182562db82 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,8 +37,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? - - wait_for_async assert connection.websocket.alive? end end @@ -55,15 +53,16 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection + connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - connection.process - wait_for_async - - assert_equal [ connection ], @server.connections - assert connection.connected + # Allow EM to run on_open callback + EM.next_tick do + assert_equal [ connection ], @server.connections + assert connection.connected + end end end @@ -73,12 +72,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - Concurrent::TimerTask.stubs(:new).returns(true) - connection.send :handle_open + EventMachine.stubs(:add_periodic_timer).returns(true) + connection.send :on_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :handle_close + connection.send :on_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index 1019ad541e..a110dfdee0 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index e9bb4e6d7f..55a9f96cb3 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :handle_open + @connection.send :on_open end def close_connection - @connection.send :handle_close + @connection.send :on_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 56d132b30a..6e6541a952 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,7 +14,6 @@ class TestServer @config.subscription_adapter.new(self) end - def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + def send_async end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 361858784e..d4a13be889 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,6 +1,7 @@ require 'test_helper' require 'concurrent' +require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -23,6 +24,8 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: + spawn_eventmachine + server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 8ddbd4e764..6636ce078b 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,16 +13,28 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } +require 'faye/websocket' +class << Faye::WebSocket + remove_method :ensure_reactor_running + + # We don't want Faye to start the EM reactor in tests because it makes testing much harder. + # We want to be able to start and stop EM loop in tests to make things simpler. + def ensure_reactor_running + # no-op + end +end + class ActionCable::TestCase < ActiveSupport::TestCase - def wait_for_async - e = Concurrent.global_io_executor - until e.completed_task_count == e.scheduled_task_count - sleep 0.1 + def run_in_eventmachine + EM.run do + yield + + EM.run_deferred_callbacks + EM.stop end end - def run_in_eventmachine - yield - wait_for_async + def spawn_eventmachine + Thread.new { EventMachine.run } unless EventMachine.reactor_running? end end -- cgit v1.2.3 From 74497eabd52f2f9f8c383808b11286283046c2b2 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 15:25:31 +1030 Subject: Revert "Revert "Eliminate the EventMachine dependency"" --- actioncable/actioncable.gemspec | 3 +- .../lib/action_cable/channel/periodic_timers.rb | 4 +- actioncable/lib/action_cable/channel/streams.rb | 2 +- actioncable/lib/action_cable/connection.rb | 5 +- actioncable/lib/action_cable/connection/base.rb | 32 +++-- .../lib/action_cable/connection/client_socket.rb | 152 +++++++++++++++++++++ .../action_cable/connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/connection/stream.rb | 59 ++++++++ .../action_cable/connection/stream_event_loop.rb | 68 +++++++++ .../lib/action_cable/connection/web_socket.rb | 22 +-- actioncable/lib/action_cable/process/logging.rb | 7 - actioncable/lib/action_cable/server.rb | 4 - actioncable/lib/action_cable/server/base.rb | 4 + actioncable/lib/action_cable/server/connections.rb | 8 +- .../lib/action_cable/subscription_adapter/async.rb | 4 +- .../subscription_adapter/postgresql.rb | 4 +- .../lib/action_cable/subscription_adapter/redis.rb | 16 +++ actioncable/test/channel/periodic_timers_test.rb | 2 +- actioncable/test/channel/stream_test.rb | 22 ++- actioncable/test/connection/base_test.rb | 19 +-- actioncable/test/connection/identifier_test.rb | 4 +- .../test/connection/multiple_identifiers_test.rb | 4 +- actioncable/test/stubs/test_server.rb | 3 +- actioncable/test/subscription_adapter/common.rb | 3 - actioncable/test/test_helper.rb | 26 +--- 25 files changed, 382 insertions(+), 99 deletions(-) create mode 100644 actioncable/lib/action_cable/connection/client_socket.rb create mode 100644 actioncable/lib/action_cable/connection/stream.rb create mode 100644 actioncable/lib/action_cable/connection/stream_event_loop.rb delete mode 100644 actioncable/lib/action_cable/process/logging.rb (limited to 'actioncable') diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index a36acc8f6f..14f968f1ef 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,8 +21,7 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'eventmachine', '~> 1.0' - s.add_dependency 'faye-websocket', '~> 0.10.0' + s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..56597d02d7 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..a26373e387 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick do + Concurrent.global_io_executor.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,15 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..0016d1a1a4 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :stream_event_loop, :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,10 +70,6 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request @@ -121,6 +117,22 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close # :nodoc: + send_async :handle_close + end + protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -139,7 +151,7 @@ module ActionCable attr_reader :message_buffer private - def on_open + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -150,11 +162,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..62dd753646 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,152 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + @stream_event_loop = stream_event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + end + + def start_driver + return if @driver.nil? || @driver_started + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + if @stream + @stream.shutdown + else + finalize_close + end + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + reason = @close_params ? @close_params[0] : '' + code = @close_params ? @close_params[1] : 1006 + + @event_target.on_close(code, reason) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..27826792b3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..ace250cd16 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,59 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + + hijack_rack_socket + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + private + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..f773814973 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,68 @@ +require 'nio' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = NIO::Selector.new + @map = {} + @stopping = false + @todo = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + run + end + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + @nio.wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister(io) + @map.delete io + end + @nio.wakeup + end + + def stop + @stopping = true + @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + if monitors = @nio.select + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue EOFError + stream.close + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..5e89fb9b72 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, stream_event_loop) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil end def possible? @@ -15,11 +13,19 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index dce637b3ca..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,7 +0,0 @@ -require 'action_cable/server' -require 'eventmachine' - -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") -end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..b00abd208c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,6 +32,10 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + end + # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..8671dd5ebd 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,11 +22,9 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end - end + @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do + Concurrent.global_io_executor.post { connections.map(&:beat) } + end.tap(&:execute) end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..c88b03947a 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..3ce1bbed68 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + Concurrent.global_io_executor << callback if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..a035e3988d 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,11 +1,18 @@ +require 'thread' + gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + @@mutex = Mutex.new + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -27,6 +34,7 @@ module ActionCable private def redis_connection_for_subscriptions + ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -37,6 +45,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 1590a12f09..64f0247cd6 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) + Concurrent::TimerTask.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 3fa2b291b7..947efd96d4 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - EM.next_tick do - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - end + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - EM.run do + run_in_eventmachine do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - EM::Timer.new(0.1) do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + wait_for_async - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - EM.run_deferred_callbacks - EM.stop - end + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" end end test "subscription confirmation should only be sent out once" do - EM.run do + run_in_eventmachine do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - EM.run_deferred_callbacks + wait_for_async expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size - EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index 182562db82..e2b017a9a1 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? + + wait_for_async assert connection.websocket.alive? end end @@ -53,16 +55,15 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection - connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - # Allow EM to run on_open callback - EM.next_tick do - assert_equal [ connection ], @server.connections - assert connection.connected - end + connection.process + wait_for_async + + assert_equal [ connection ], @server.connections + assert connection.connected end end @@ -72,12 +73,12 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - EventMachine.stubs(:add_periodic_timer).returns(true) - connection.send :on_open + Concurrent::TimerTask.stubs(:new).returns(true) + connection.send :handle_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :on_close + connection.send :handle_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index a110dfdee0..1019ad541e 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index 55a9f96cb3..e9bb4e6d7f 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 6e6541a952..56d132b30a 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,6 +14,7 @@ class TestServer @config.subscription_adapter.new(self) end - def send_async + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new end end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index d4a13be889..361858784e 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,7 +1,6 @@ require 'test_helper' require 'concurrent' -require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' @@ -24,8 +23,6 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: - spawn_eventmachine - server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 6636ce078b..8ddbd4e764 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,28 +13,16 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -require 'faye/websocket' -class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end -end - class ActionCable::TestCase < ActiveSupport::TestCase - def run_in_eventmachine - EM.run do - yield - - EM.run_deferred_callbacks - EM.stop + def wait_for_async + e = Concurrent.global_io_executor + until e.completed_task_count == e.scheduled_task_count + sleep 0.1 end end - def spawn_eventmachine - Thread.new { EventMachine.run } unless EventMachine.reactor_running? + def run_in_eventmachine + yield + wait_for_async end end -- cgit v1.2.3 From a928aa3d3f1e6f8780acc22d69f4d5d1f5917926 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:55:05 +1030 Subject: Fix arguments to on_close --- actioncable/lib/action_cable/connection/base.rb | 2 +- actioncable/lib/action_cable/connection/client_socket.rb | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 0016d1a1a4..b5f898436a 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -129,7 +129,7 @@ module ActionCable # ignore end - def on_close # :nodoc: + def on_close(reason, code) # :nodoc: send_async :handle_close end diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb index 62dd753646..ef937d7c16 100644 --- a/actioncable/lib/action_cable/connection/client_socket.rb +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -37,6 +37,7 @@ module ActionCable @url = ClientSocket.determine_url(@env) @driver = @driver_started = nil + @close_params = ['', 1006] @ready_state = CONNECTING @@ -142,10 +143,7 @@ module ActionCable return if @ready_state == CLOSED @ready_state = CLOSED - reason = @close_params ? @close_params[0] : '' - code = @close_params ? @close_params[1] : 1006 - - @event_target.on_close(code, reason) + @event_target.on_close(*@close_params) end end end -- cgit v1.2.3 From 16a6603956551703e3bbd06101c568a73bcdaa52 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:53:27 +1030 Subject: Synchronize the lazy setters in Server They're all at risk of races on the first requests. --- actioncable/lib/action_cable/server/base.rb | 23 +++++++++++++++------- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../action_cable/subscription_adapter/inline.rb | 11 ++++++++++- .../subscription_adapter/postgresql.rb | 7 ++++++- .../lib/action_cable/subscription_adapter/redis.rb | 17 ++++++++++++---- 5 files changed, 47 insertions(+), 15 deletions(-) (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index b00abd208c..fe48c112df 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,3 +1,5 @@ +require 'thread' + module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -13,7 +15,12 @@ module ActionCable def self.logger; config.logger; end delegate :logger, to: :config + attr_reader :mutex + def initialize + @mutex = Mutex.new + + @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil end # Called by rack to setup the server. @@ -29,29 +36,31 @@ module ActionCable # Gateway to RemoteConnections. See that class for details. def remote_connections - @remote_connections ||= RemoteConnections.new(self) + @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) } end def stream_event_loop - @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new } end # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool - @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) + @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } end # Requires and returns a hash of all the channel class constants keyed by name. def channel_classes - @channel_classes ||= begin - config.channel_paths.each { |channel_path| require channel_path } - config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + @channel_classes || @mutex.synchronize do + @channel_classes ||= begin + config.channel_paths.each { |channel_path| require channel_path } + config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize } + end end end # Adapter used for all streams/broadcasting. def pubsub - @pubsub ||= config.pubsub_adapter.new(self) + @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..cca6894289 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -4,8 +4,8 @@ module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private - def subscriber_map - @subscriber_map ||= AsyncSubscriberMap.new + def new_subscriber_map + AsyncSubscriberMap.new end class AsyncSubscriberMap < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 4a2a8d23a2..81357faead 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -1,6 +1,11 @@ module ActionCable module SubscriptionAdapter class Inline < Base # :nodoc: + def initialize(*) + super + @subscriber_map = nil + end + def broadcast(channel, payload) subscriber_map.broadcast(channel, payload) end @@ -19,7 +24,11 @@ module ActionCable private def subscriber_map - @subscriber_map ||= SubscriberMap.new + @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map } + end + + def new_subscriber_map + SubscriberMap.new end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..abaeb92e54 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,7 +42,7 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end class Listener < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..560b79df16 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -13,6 +13,11 @@ module ActionCable class Redis < Base # :nodoc: @@mutex = Mutex.new + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -35,15 +40,19 @@ module ActionCable private def redis_connection_for_subscriptions ensure_reactor_running - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end end def redis_connection_for_broadcasts - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end end def ensure_reactor_running -- cgit v1.2.3 From ce37de4a19447fc89d2d16f15ba9314fba30d47e Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:55:53 +1030 Subject: Add a couple of tests that connect with a WS client --- actioncable/test/client/echo_channel.rb | 13 ++ actioncable/test/client_test.rb | 238 ++++++++++++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 actioncable/test/client/echo_channel.rb create mode 100644 actioncable/test/client_test.rb (limited to 'actioncable') diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb new file mode 100644 index 0000000000..9a54080d4d --- /dev/null +++ b/actioncable/test/client/echo_channel.rb @@ -0,0 +1,13 @@ +class EchoChannel < ActionCable::Channel::Base + def subscribed + stream_from "global" + end + + def ding(data) + transmit(dong: data['message']) + end + + def bulk(data) + ActionCable.server.broadcast "global", wide: data['message'] + end +end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb new file mode 100644 index 0000000000..b185654c71 --- /dev/null +++ b/actioncable/test/client_test.rb @@ -0,0 +1,238 @@ +require 'test_helper' +require 'concurrent' + +require 'active_support/core_ext/hash/indifferent_access' +require 'pathname' + +require 'faye/websocket' +require 'json' + +class ClientTest < ActionCable::TestCase + WAIT_WHEN_EXPECTING_EVENT = 3 + WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2 + + def setup + # TODO: ActionCable requires a *lot* of setup at the moment... + ::Object.const_set(:ApplicationCable, Module.new) + ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base)) + + ::Object.const_set(:Rails, Module.new) + ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) } + + ActionCable.instance_variable_set(:@server, nil) + server = ActionCable.server + server.config = ActionCable::Server::Configuration.new + inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN } + server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: []) + + server.config.cable = { adapter: 'async' }.with_indifferent_access + + # and now the "real" setup for our test: + server.config.disable_request_forgery_protection = true + server.config.channel_load_paths = [File.expand_path('client', __dir__)] + + @reactor_mutex = Mutex.new + @reactor_users = 0 + @reactor_running = Concurrent::Event.new + + # faye-websocket is warning-rich + @previous_verbose, $VERBOSE = $VERBOSE, nil + end + + def teardown + $VERBOSE = @previous_verbose + + if @reactor_running.set? + EM.stop + end + + begin + ::Object.send(:remove_const, :ApplicationCable) + rescue NameError + end + begin + ::Object.send(:remove_const, :Rails) + rescue NameError + end + end + + def with_puma_server(rack_app = ActionCable.server, port = 3099) + server = ::Puma::Server.new(rack_app, ::Puma::Events.strings) + server.add_tcp_listener '127.0.0.1', port + server.min_threads = 1 + server.max_threads = 4 + + t = Thread.new { server.run.join } + yield port + + ensure + server.stop(true) + t.join + end + + def start_event_machine + @reactor_mutex.synchronize do + unless @reactor_running.set? + @reactor ||= Thread.new do + EM.next_tick do + @reactor_running.set + end + EM.run + end + @reactor_running.wait(WAIT_WHEN_EXPECTING_EVENT) + end + @reactor_users += 1 + end + end + + def stop_event_machine + @reactor_mutex.synchronize do + @reactor_users -= 1 + if @reactor_users < 1 + @reactor_running.reset + EM.stop + @reactor = nil + end + end + end + + class SyncClient + attr_reader :pings + + def initialize(em_controller, port) + em_controller.start_event_machine + + @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") + @messages = Queue.new + @closed = Concurrent::Event.new + @has_messages = Concurrent::Event.new + @pings = 0 + + open = Concurrent::Event.new + error = nil + + @ws.on(:error) do |event| + if open.set? + @messages << RuntimeError.new(event.message) + else + error = event.message + open.set + end + end + + @ws.on(:open) do |event| + open.set + end + + @ws.on(:message) do |event| + hash = JSON.parse(event.data) + if hash['identifier'] == '_ping' + @pings += 1 + else + @messages << hash + @has_messages.set + end + end + + @ws.on(:close) do |event| + @ws = nil + em_controller.stop_event_machine + @closed.set + end + + open.wait(WAIT_WHEN_EXPECTING_EVENT) + raise error if error + end + + def read_message + @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty? + @has_messages.reset if @messages.size < 2 + + msg = @messages.pop(true) + raise msg if msg.is_a?(Exception) + + msg + end + + def read_messages + list = [] + loop do + @has_messages.wait(WAIT_WHEN_NOT_EXPECTING_EVENT) + if @has_messages.set? + list << read_message + else + break + end + end + list + end + + def send_message(hash) + @ws.send(JSON.dump(hash)) + end + + def close + sleep WAIT_WHEN_NOT_EXPECTING_EVENT + + unless @messages.empty? + raise "#{@messages.size} messages unprocessed" + end + + @ws.close + @closed.wait(WAIT_WHEN_EXPECTING_EVENT) + end + end + + def faye_client(port) + SyncClient.new(self, port) + end + + def test_single_client + with_puma_server do |port| + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) + c.close + end + end + + def test_interacting_clients + with_puma_server do |port| + clients = 20.times.map { faye_client(port) } + + barrier_1 = Concurrent::CyclicBarrier.new(clients.size) + barrier_2 = Concurrent::CyclicBarrier.new(clients.size) + + clients.map {|c| Concurrent::Future.execute { + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + barrier_1.wait WAIT_WHEN_EXPECTING_EVENT + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') + barrier_2.wait WAIT_WHEN_EXPECTING_EVENT + sleep 1 + assert_equal clients.size, c.read_messages.size + } }.each(&:wait!) + + clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + end + end + + def test_many_clients + with_puma_server do |port| + clients = 200.times.map { faye_client(port) } + + clients.map {|c| Concurrent::Future.execute { + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + } }.each(&:wait!) + + clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) + end + end +end -- cgit v1.2.3 From 786ed1b3ad8eeddb911211b67031016730ed55c8 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 18:46:14 +1030 Subject: Handle more IO errors (especially, ECONNRESET) Also, address the possibility of the listen thread dying and needing to be respawned. As a bonus, we now defer construction of the thread until we are first given something to monitor. --- .../action_cable/connection/stream_event_loop.rb | 68 +++++++++++++++------- actioncable/test/client/echo_channel.rb | 5 ++ actioncable/test/client_test.rb | 26 +++++++++ 3 files changed, 78 insertions(+), 21 deletions(-) (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb index f773814973..e6335082d2 100644 --- a/actioncable/lib/action_cable/connection/stream_event_loop.rb +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -1,18 +1,17 @@ require 'nio' +require 'thread' module ActionCable module Connection class StreamEventLoop def initialize - @nio = NIO::Selector.new + @nio = @thread = nil @map = {} @stopping = false @todo = Queue.new - Thread.new do - Thread.current.abort_on_exception = true - run - end + @spawn_mutex = Mutex.new + spawn end def attach(io, stream) @@ -20,34 +19,53 @@ module ActionCable @map[io] = stream @nio.register(io, :r) end - @nio.wakeup + wakeup end def detach(io, stream) @todo << lambda do - @nio.deregister(io) + @nio.deregister io @map.delete io end - @nio.wakeup + wakeup end def stop @stopping = true - @nio.wakeup + wakeup if @nio end - def run - loop do - if @stopping - @nio.close - break - end + private + def spawn + return if @thread && @thread.status + + @spawn_mutex.synchronize do + return if @thread && @thread.status + + @nio ||= NIO::Selector.new + @thread = Thread.new { run } - until @todo.empty? - @todo.pop(true).call + return true end + end + + def wakeup + spawn || @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + next unless monitors = @nio.select - if monitors = @nio.select monitors.each do |monitor| io = monitor.io stream = @map[io] @@ -56,13 +74,21 @@ module ActionCable stream.receive io.read_nonblock(4096) rescue IO::WaitReadable next - rescue EOFError - stream.close + rescue + # We expect one of EOFError or Errno::ECONNRESET in + # normal operation (when the client goes away). But if + # anything else goes wrong, this is still the best way + # to handle it. + begin + stream.close + rescue + @nio.deregister io + @map.delete io + end end end end end - end end end end diff --git a/actioncable/test/client/echo_channel.rb b/actioncable/test/client/echo_channel.rb index 9a54080d4d..63e35f194a 100644 --- a/actioncable/test/client/echo_channel.rb +++ b/actioncable/test/client/echo_channel.rb @@ -7,6 +7,11 @@ class EchoChannel < ActionCable::Channel::Base transmit(dong: data['message']) end + def delay(data) + sleep 1 + transmit(dong: data['message']) + end + def bulk(data) ActionCable.server.broadcast "global", wide: data['message'] end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index b185654c71..7617e93426 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -181,6 +181,15 @@ class ClientTest < ActionCable::TestCase @ws.close @closed.wait(WAIT_WHEN_EXPECTING_EVENT) end + + def close! + sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach) + + # Force a TCP reset + sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii')) + + sock.close + end end def faye_client(port) @@ -235,4 +244,21 @@ class ClientTest < ActionCable::TestCase clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) end end + + def test_disappearing_client + with_puma_server do |port| + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') + c.close! # disappear before write + + c = faye_client(port) + c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) + c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) + c.close! # disappear before read + end + end end -- cgit v1.2.3 From 0b94afb0757e02deccced2d85b8478b78f269e0b Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 21:19:10 +1030 Subject: Be more patient while gathering the expected responses --- actioncable/test/client_test.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 7617e93426..66fa79afd6 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -154,10 +154,10 @@ class ClientTest < ActionCable::TestCase msg end - def read_messages + def read_messages(expected_size = 0) list = [] loop do - @has_messages.wait(WAIT_WHEN_NOT_EXPECTING_EVENT) + @has_messages.wait(list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT) if @has_messages.set? list << read_message else @@ -222,8 +222,7 @@ class ClientTest < ActionCable::TestCase barrier_1.wait WAIT_WHEN_EXPECTING_EVENT c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') barrier_2.wait WAIT_WHEN_EXPECTING_EVENT - sleep 1 - assert_equal clients.size, c.read_messages.size + assert_equal clients.size, c.read_messages(clients.size).size } }.each(&:wait!) clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) -- cgit v1.2.3 From 3043601d7ab4a3b0e7eea1c6353ab96258f8a78c Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 22:02:48 +1030 Subject: Reduce the client count, in hope of a more consistent test --- actioncable/test/client_test.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 66fa79afd6..4629d858a1 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -40,12 +40,12 @@ class ClientTest < ActionCable::TestCase end def teardown - $VERBOSE = @previous_verbose - if @reactor_running.set? EM.stop end + $VERBOSE = @previous_verbose + begin ::Object.send(:remove_const, :ApplicationCable) rescue NameError @@ -209,7 +209,7 @@ class ClientTest < ActionCable::TestCase def test_interacting_clients with_puma_server do |port| - clients = 20.times.map { faye_client(port) } + clients = 10.times.map { faye_client(port) } barrier_1 = Concurrent::CyclicBarrier.new(clients.size) barrier_2 = Concurrent::CyclicBarrier.new(clients.size) -- cgit v1.2.3 From 4d01cd1545a00ed6f96d6cb658a590afd36e1871 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 22:46:33 +1030 Subject: Keep the socket reference after close We may still try to send to it. --- actioncable/test/client_test.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 4629d858a1..3d9690fb01 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -66,8 +66,8 @@ class ClientTest < ActionCable::TestCase yield port ensure - server.stop(true) - t.join + server.stop(true) if server + t.join if t end def start_event_machine @@ -135,7 +135,6 @@ class ClientTest < ActionCable::TestCase end @ws.on(:close) do |event| - @ws = nil em_controller.stop_event_machine @closed.set end -- cgit v1.2.3 From 1c893bc2f94fd2e9e2c0d253eac0faedcd0046b8 Mon Sep 17 00:00:00 2001 From: Kesha Antonov Date: Fri, 29 Jan 2016 21:56:51 +0300 Subject: remove require logging --- actioncable/README.md | 2 -- 1 file changed, 2 deletions(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index cad71ddf94..ac57532b62 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -397,8 +397,6 @@ application. The recommended basic setup is as follows: require ::File.expand_path('../../config/environment', __FILE__) Rails.application.eager_load! -require 'action_cable/process/logging' - run ActionCable.server ``` -- cgit v1.2.3 From e6d0d4b1ae1970f3c4379f5bc2c4147a227221c3 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Sat, 30 Jan 2016 06:41:14 +1030 Subject: Don't bother stopping EM between tests It's not strictly necessary, and maybe this will help with the current test failure. --- actioncable/test/client_test.rb | 41 +++-------------------------------------- 1 file changed, 3 insertions(+), 38 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 3d9690fb01..fd03c2a24c 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -31,19 +31,13 @@ class ClientTest < ActionCable::TestCase server.config.disable_request_forgery_protection = true server.config.channel_load_paths = [File.expand_path('client', __dir__)] - @reactor_mutex = Mutex.new - @reactor_users = 0 - @reactor_running = Concurrent::Event.new + Thread.new { EventMachine.run } unless EventMachine.reactor_running? # faye-websocket is warning-rich @previous_verbose, $VERBOSE = $VERBOSE, nil end def teardown - if @reactor_running.set? - EM.stop - end - $VERBOSE = @previous_verbose begin @@ -70,38 +64,10 @@ class ClientTest < ActionCable::TestCase t.join if t end - def start_event_machine - @reactor_mutex.synchronize do - unless @reactor_running.set? - @reactor ||= Thread.new do - EM.next_tick do - @reactor_running.set - end - EM.run - end - @reactor_running.wait(WAIT_WHEN_EXPECTING_EVENT) - end - @reactor_users += 1 - end - end - - def stop_event_machine - @reactor_mutex.synchronize do - @reactor_users -= 1 - if @reactor_users < 1 - @reactor_running.reset - EM.stop - @reactor = nil - end - end - end - class SyncClient attr_reader :pings - def initialize(em_controller, port) - em_controller.start_event_machine - + def initialize(port) @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/") @messages = Queue.new @closed = Concurrent::Event.new @@ -135,7 +101,6 @@ class ClientTest < ActionCable::TestCase end @ws.on(:close) do |event| - em_controller.stop_event_machine @closed.set end @@ -192,7 +157,7 @@ class ClientTest < ActionCable::TestCase end def faye_client(port) - SyncClient.new(self, port) + SyncClient.new(port) end def test_single_client -- cgit v1.2.3 From 0ae187961c75c44ea418f44ce0c09f78cdf520ff Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Sat, 30 Jan 2016 07:51:41 +1030 Subject: Use a gentler disconnect The detach used by close! seems to be making EM very sad on Travis. --- actioncable/test/client_test.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index fd03c2a24c..d7eecfa322 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -214,14 +214,14 @@ class ClientTest < ActionCable::TestCase c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') - c.close! # disappear before write + c.close # disappear before write c = faye_client(port) c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) - c.close! # disappear before read + c.close # disappear before read end end end -- cgit v1.2.3 From 896950a605c509f19f3e8cbde11e23ca87036ca3 Mon Sep 17 00:00:00 2001 From: Javan Makhmali Date: Sat, 30 Jan 2016 15:41:14 -0500 Subject: Add task to create precompiled action_cable.js and reorganize to accommodate --- actioncable/.gitignore | 1 + actioncable/Rakefile | 29 ++++++++ actioncable/actioncable.gemspec | 7 +- .../lib/assets/javascripts/action_cable.coffee.erb | 23 ------ .../javascripts/action_cable/connection.coffee | 81 ---------------------- .../action_cable/connection_monitor.coffee | 79 --------------------- .../javascripts/action_cable/consumer.coffee | 25 ------- .../lib/assets/javascripts/action_cable/index.js | 1 + .../action_cable/source/connection.coffee | 81 ++++++++++++++++++++++ .../action_cable/source/connection_monitor.coffee | 79 +++++++++++++++++++++ .../action_cable/source/consumer.coffee | 25 +++++++ .../action_cable/source/index.coffee.erb | 23 ++++++ .../action_cable/source/subscription.coffee | 68 ++++++++++++++++++ .../action_cable/source/subscriptions.coffee | 64 +++++++++++++++++ .../javascripts/action_cable/subscription.coffee | 68 ------------------ .../javascripts/action_cable/subscriptions.coffee | 64 ----------------- 16 files changed, 376 insertions(+), 342 deletions(-) create mode 100644 actioncable/.gitignore delete mode 100644 actioncable/lib/assets/javascripts/action_cable.coffee.erb delete mode 100644 actioncable/lib/assets/javascripts/action_cable/connection.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/consumer.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/index.js create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee create mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/subscription.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee (limited to 'actioncable') diff --git a/actioncable/.gitignore b/actioncable/.gitignore new file mode 100644 index 0000000000..ceeb05b410 --- /dev/null +++ b/actioncable/.gitignore @@ -0,0 +1 @@ +/tmp diff --git a/actioncable/Rakefile b/actioncable/Rakefile index b6c56e9195..9ba431f8a9 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -1,4 +1,8 @@ require 'rake/testtask' +require 'pathname' +require 'sprockets' +require 'coffee-script' +require 'action_cable' dir = File.dirname(__FILE__) @@ -11,3 +15,28 @@ Rake::TestTask.new do |t| t.verbose = true t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION) end + +namespace :assets do + desc "Compile dist/action_cable.js" + task :compile do + asset_mapping = { "source.js" => "action_cable.js" } + + root_path = Pathname.new(dir) + load_path = root_path.join("lib/assets/javascripts/action_cable") + + compile_path = root_path.join("tmp/sprockets") + compile_path.rmtree if compile_path.exist? + compile_path.mkpath + + environment = Sprockets::Environment.new + environment.append_path(load_path) + + manifest = Sprockets::Manifest.new(environment.index, compile_path) + manifest.compile(asset_mapping.keys) + + asset_mapping.each do |logical_path, dist_path| + fingerprint_path = manifest.assets[logical_path] + FileUtils.cp(compile_path.join(fingerprint_path), load_path.join("dist/#{dist_path}")) + end + end +end diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 14f968f1ef..0976895ef7 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -24,9 +24,12 @@ Gem::Specification.new do |s| s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' - s.add_development_dependency 'em-hiredis', '~> 0.3.0' + s.add_development_dependency 'coffee-script', '~> 2.4.1' + s.add_development_dependency 'coffee-script-source', '~> 1.10.0' + s.add_development_dependency 'em-hiredis', '~> 0.3.0' s.add_development_dependency 'mocha' s.add_development_dependency 'pg' s.add_development_dependency 'puma' - s.add_development_dependency 'redis', '~> 3.0' + s.add_development_dependency 'redis', '~> 3.0' + s.add_development_dependency 'sprockets', '~> 3.5.2' end diff --git a/actioncable/lib/assets/javascripts/action_cable.coffee.erb b/actioncable/lib/assets/javascripts/action_cable.coffee.erb deleted file mode 100644 index 7daea4ebcd..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require action_cable/consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/consumer.coffee deleted file mode 100644 index fcd8d0fb6c..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require action_cable/connection -#= require action_cable/connection_monitor -#= require action_cable/subscriptions -#= require action_cable/subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/index.js b/actioncable/lib/assets/javascripts/action_cable/index.js new file mode 100644 index 0000000000..6c69e42337 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/index.js @@ -0,0 +1 @@ +//= require ./dist/action_cable diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee new file mode 100644 index 0000000000..fbd7dbd35b --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee @@ -0,0 +1,81 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +{message_types} = ActionCable.INTERNAL + +class ActionCable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + switch type + when message_types.confirmation + @consumer.subscriptions.notify(identifier, "connected") + when message_types.rejection + @consumer.subscriptions.reject(identifier) + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee new file mode 100644 index 0000000000..99b9a1c6d5 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee @@ -0,0 +1,79 @@ +# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. +class ActionCable.ConnectionMonitor + @pollInterval: + min: 3 + max: 30 + + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: ActionCable.INTERNAL.identifiers.ping + + constructor: (@consumer) -> + @consumer.subscriptions.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) + + stop: -> + @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() + + connectionIsStale: -> + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee new file mode 100644 index 0000000000..717c0641a9 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee @@ -0,0 +1,25 @@ +#= require ./connection +#= require ./connection_monitor +#= require ./subscriptions +#= require ./subscription + +# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +# method. +# +# The following example shows how this can be setup: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Consumer + constructor: (@url) -> + @subscriptions = new ActionCable.Subscriptions this + @connection = new ActionCable.Connection this + @connectionMonitor = new ActionCable.ConnectionMonitor this + + send: (data) -> + @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb new file mode 100644 index 0000000000..f4615b7502 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb @@ -0,0 +1,23 @@ +#= require_self +#= require ./consumer + +@ActionCable = + INTERNAL: <%= ActionCable::INTERNAL.to_json %> + + createConsumer: (url = @getConfig("url")) -> + new ActionCable.Consumer @createWebSocketURL(url) + + getConfig: (name) -> + element = document.head.querySelector("meta[name='action-cable-#{name}']") + element?.getAttribute("content") + + createWebSocketURL: (url) -> + if url and not /^wss?:/i.test(url) + a = document.createElement("a") + a.href = url + # Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + a.href + else + url diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee new file mode 100644 index 0000000000..339d676933 --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee @@ -0,0 +1,68 @@ +# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +# Channel instance on the server side. +# +# An example demonstrates the basic functionality: +# +# App.appearance = App.cable.subscriptions.create "AppearanceChannel", +# connected: -> +# # Called once the subscription has been successfully completed +# +# appear: -> +# @perform 'appear', appearing_on: @appearingOn() +# +# away: -> +# @perform 'away' +# +# appearingOn: -> +# $('main').data 'appearing-on' +# +# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +# +# This is how the server component would look: +# +# class AppearanceChannel < ApplicationActionCable::Channel +# def subscribed +# current_user.appear +# end +# +# def unsubscribed +# current_user.disappear +# end +# +# def appear(data) +# current_user.appear on: data['appearing_on'] +# end +# +# def away +# current_user.away +# end +# end +# +# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. +class ActionCable.Subscription + constructor: (@subscriptions, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @subscriptions.add(this) + @consumer = @subscriptions.consumer + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @subscriptions.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee new file mode 100644 index 0000000000..ae041ffa2b --- /dev/null +++ b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee @@ -0,0 +1,64 @@ +# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Subscriptions + constructor: (@consumer) -> + @subscriptions = [] + + create: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new ActionCable.Subscription this, params, mixin + + # Private + + add: (subscription) -> + @subscriptions.push(subscription) + @notify(subscription, "initialized") + @sendCommand(subscription, "subscribe") + + remove: (subscription) -> + @forget(subscription) + + unless @findAll(subscription.identifier).length + @sendCommand(subscription, "unsubscribe") + + reject: (identifier) -> + for subscription in @findAll(identifier) + @forget(subscription) + @notify(subscription, "rejected") + + forget: (subscription) -> + @subscriptions = (s for s in @subscriptions when s isnt subscription) + + findAll: (identifier) -> + s for s in @subscriptions when s.identifier is identifier + + reload: -> + for subscription in @subscriptions + @sendCommand(subscription, "subscribe") + + notifyAll: (callbackName, args...) -> + for subscription in @subscriptions + @notify(subscription, callbackName, args...) + + notify: (subscription, callbackName, args...) -> + if typeof subscription is "string" + subscriptions = @findAll(subscription) + else + subscriptions = [subscription] + + for subscription in subscriptions + subscription[callbackName]?(args...) + + sendCommand: (subscription, command) -> + {identifier} = subscription + if identifier is ActionCable.INTERNAL.identifiers.ping + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) diff --git a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) -- cgit v1.2.3 From 09a706065952d58d515420b19a55df619eb7f53d Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Sat, 30 Jan 2016 20:39:22 -0500 Subject: Improvements and reorganization of assets --- actioncable/.gitignore | 1 + actioncable/Rakefile | 10 ++- .../app/assets/javascripts/action_cable/index.js | 1 + .../action_cable/source/connection.coffee | 81 ++++++++++++++++++++++ .../action_cable/source/connection_monitor.coffee | 79 +++++++++++++++++++++ .../action_cable/source/consumer.coffee | 25 +++++++ .../action_cable/source/index.coffee.erb | 23 ++++++ .../action_cable/source/subscription.coffee | 68 ++++++++++++++++++ .../action_cable/source/subscriptions.coffee | 64 +++++++++++++++++ .../lib/assets/javascripts/action_cable/index.js | 1 - .../action_cable/source/connection.coffee | 81 ---------------------- .../action_cable/source/connection_monitor.coffee | 79 --------------------- .../action_cable/source/consumer.coffee | 25 ------- .../action_cable/source/index.coffee.erb | 23 ------ .../action_cable/source/subscription.coffee | 68 ------------------ .../action_cable/source/subscriptions.coffee | 64 ----------------- 16 files changed, 350 insertions(+), 343 deletions(-) create mode 100644 actioncable/app/assets/javascripts/action_cable/index.js create mode 100644 actioncable/app/assets/javascripts/action_cable/source/connection.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/source/consumer.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb create mode 100644 actioncable/app/assets/javascripts/action_cable/source/subscription.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/index.js delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee delete mode 100644 actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee (limited to 'actioncable') diff --git a/actioncable/.gitignore b/actioncable/.gitignore index ceeb05b410..8ded114548 100644 --- a/actioncable/.gitignore +++ b/actioncable/.gitignore @@ -1 +1,2 @@ +/lib/assets/javascripts/action_cable.js /tmp diff --git a/actioncable/Rakefile b/actioncable/Rakefile index 9ba431f8a9..0a036e3e3d 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -19,10 +19,13 @@ end namespace :assets do desc "Compile dist/action_cable.js" task :compile do + puts 'Compiling Action Cable assets...' + asset_mapping = { "source.js" => "action_cable.js" } root_path = Pathname.new(dir) - load_path = root_path.join("lib/assets/javascripts/action_cable") + load_path = root_path.join("app/assets/javascripts/action_cable") + destination_path = root_path.join("lib/assets/javascripts") compile_path = root_path.join("tmp/sprockets") compile_path.rmtree if compile_path.exist? @@ -36,7 +39,10 @@ namespace :assets do asset_mapping.each do |logical_path, dist_path| fingerprint_path = manifest.assets[logical_path] - FileUtils.cp(compile_path.join(fingerprint_path), load_path.join("dist/#{dist_path}")) + FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(dist_path)) end + + puts '======' + puts 'Action Cable assets compiled successfully!' end end diff --git a/actioncable/app/assets/javascripts/action_cable/index.js b/actioncable/app/assets/javascripts/action_cable/index.js new file mode 100644 index 0000000000..e97870c3b0 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/index.js @@ -0,0 +1 @@ +//= require_tree ./source diff --git a/actioncable/app/assets/javascripts/action_cable/source/connection.coffee b/actioncable/app/assets/javascripts/action_cable/source/connection.coffee new file mode 100644 index 0000000000..fbd7dbd35b --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/connection.coffee @@ -0,0 +1,81 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +{message_types} = ActionCable.INTERNAL + +class ActionCable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + switch type + when message_types.confirmation + @consumer.subscriptions.notify(identifier, "connected") + when message_types.rejection + @consumer.subscriptions.reject(identifier) + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee new file mode 100644 index 0000000000..99b9a1c6d5 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee @@ -0,0 +1,79 @@ +# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. +class ActionCable.ConnectionMonitor + @pollInterval: + min: 3 + max: 30 + + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: ActionCable.INTERNAL.identifiers.ping + + constructor: (@consumer) -> + @consumer.subscriptions.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) + + stop: -> + @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() + + connectionIsStale: -> + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee new file mode 100644 index 0000000000..717c0641a9 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee @@ -0,0 +1,25 @@ +#= require ./connection +#= require ./connection_monitor +#= require ./subscriptions +#= require ./subscription + +# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +# method. +# +# The following example shows how this can be setup: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Consumer + constructor: (@url) -> + @subscriptions = new ActionCable.Subscriptions this + @connection = new ActionCable.Connection this + @connectionMonitor = new ActionCable.ConnectionMonitor this + + send: (data) -> + @connection.send(data) diff --git a/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb new file mode 100644 index 0000000000..f4615b7502 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb @@ -0,0 +1,23 @@ +#= require_self +#= require ./consumer + +@ActionCable = + INTERNAL: <%= ActionCable::INTERNAL.to_json %> + + createConsumer: (url = @getConfig("url")) -> + new ActionCable.Consumer @createWebSocketURL(url) + + getConfig: (name) -> + element = document.head.querySelector("meta[name='action-cable-#{name}']") + element?.getAttribute("content") + + createWebSocketURL: (url) -> + if url and not /^wss?:/i.test(url) + a = document.createElement("a") + a.href = url + # Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + a.href + else + url diff --git a/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee new file mode 100644 index 0000000000..339d676933 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee @@ -0,0 +1,68 @@ +# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +# Channel instance on the server side. +# +# An example demonstrates the basic functionality: +# +# App.appearance = App.cable.subscriptions.create "AppearanceChannel", +# connected: -> +# # Called once the subscription has been successfully completed +# +# appear: -> +# @perform 'appear', appearing_on: @appearingOn() +# +# away: -> +# @perform 'away' +# +# appearingOn: -> +# $('main').data 'appearing-on' +# +# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +# +# This is how the server component would look: +# +# class AppearanceChannel < ApplicationActionCable::Channel +# def subscribed +# current_user.appear +# end +# +# def unsubscribed +# current_user.disappear +# end +# +# def appear(data) +# current_user.appear on: data['appearing_on'] +# end +# +# def away +# current_user.away +# end +# end +# +# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. +class ActionCable.Subscription + constructor: (@subscriptions, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @subscriptions.add(this) + @consumer = @subscriptions.consumer + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @subscriptions.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee new file mode 100644 index 0000000000..ae041ffa2b --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee @@ -0,0 +1,64 @@ +# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Subscriptions + constructor: (@consumer) -> + @subscriptions = [] + + create: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new ActionCable.Subscription this, params, mixin + + # Private + + add: (subscription) -> + @subscriptions.push(subscription) + @notify(subscription, "initialized") + @sendCommand(subscription, "subscribe") + + remove: (subscription) -> + @forget(subscription) + + unless @findAll(subscription.identifier).length + @sendCommand(subscription, "unsubscribe") + + reject: (identifier) -> + for subscription in @findAll(identifier) + @forget(subscription) + @notify(subscription, "rejected") + + forget: (subscription) -> + @subscriptions = (s for s in @subscriptions when s isnt subscription) + + findAll: (identifier) -> + s for s in @subscriptions when s.identifier is identifier + + reload: -> + for subscription in @subscriptions + @sendCommand(subscription, "subscribe") + + notifyAll: (callbackName, args...) -> + for subscription in @subscriptions + @notify(subscription, callbackName, args...) + + notify: (subscription, callbackName, args...) -> + if typeof subscription is "string" + subscriptions = @findAll(subscription) + else + subscriptions = [subscription] + + for subscription in subscriptions + subscription[callbackName]?(args...) + + sendCommand: (subscription, command) -> + {identifier} = subscription + if identifier is ActionCable.INTERNAL.identifiers.ping + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) diff --git a/actioncable/lib/assets/javascripts/action_cable/index.js b/actioncable/lib/assets/javascripts/action_cable/index.js deleted file mode 100644 index 6c69e42337..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/index.js +++ /dev/null @@ -1 +0,0 @@ -//= require ./dist/action_cable diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee deleted file mode 100644 index 717c0641a9..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require ./connection -#= require ./connection_monitor -#= require ./subscriptions -#= require ./subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb deleted file mode 100644 index f4615b7502..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/index.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require ./consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/lib/assets/javascripts/action_cable/source/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) -- cgit v1.2.3 From e77368637e17e6a33db2713f651e85a09456c645 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 01:30:00 +1030 Subject: Switch the default redis adapter to a single-stream model This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with. --- .../subscription_adapter/evented_redis.rb | 67 +++++++++ .../lib/action_cable/subscription_adapter/redis.rb | 156 +++++++++++++++++---- .../subscription_adapter/evented_redis_test.rb | 10 ++ .../test/subscription_adapter/redis_test.rb | 8 +- 4 files changed, 210 insertions(+), 31 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/evented_redis.rb create mode 100644 actioncable/test/subscription_adapter/evented_redis_test.rb (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..d697548cbd --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,67 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 560b79df16..7076383efe 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,52 +1,40 @@ require 'thread' -gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def initialize(*) super - @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + @listener = nil + @redis_connection_for_broadcasts = nil end def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + ::Redis.new(@server.config.cable) end private - def redis_connection_for_subscriptions - ensure_reactor_running - @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end def redis_connection_for_broadcasts @@ -55,12 +43,120 @@ module ActionCable end end - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + class Listener < SubscriberMap + def initialize(adapter) + super() + + @adapter = adapter + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + Concurrent.global_io_executor << next_callback if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + Concurrent.global_io_executor.post { super } end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb new file mode 100644 index 0000000000..70333e51bd --- /dev/null +++ b/actioncable/test/subscription_adapter/evented_redis_test.rb @@ -0,0 +1,10 @@ +require 'test_helper' +require_relative './common' + +class EventedRedisAdapterTest < ActionCable::TestCase + include CommonSubscriptionAdapterTest + + def cable_config + { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' } + end +end diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb index 8d52832c87..4f34dd86c9 100644 --- a/actioncable/test/subscription_adapter/redis_test.rb +++ b/actioncable/test/subscription_adapter/redis_test.rb @@ -5,6 +5,12 @@ class RedisAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest def cable_config - { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' } + { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' } + end +end + +class RedisAdapterTest::Hiredis < RedisAdapterTest + def cable_config + super.merge(driver: 'hiredis') end end -- cgit v1.2.3 From 4c38319cc25bb248947a089072442e843761e46d Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 03:00:35 +1030 Subject: Wait for EventMachine to finish starting --- actioncable/test/client_test.rb | 1 + 1 file changed, 1 insertion(+) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index d7eecfa322..3c79d61569 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -32,6 +32,7 @@ class ClientTest < ActionCable::TestCase server.config.channel_load_paths = [File.expand_path('client', __dir__)] Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? # faye-websocket is warning-rich @previous_verbose, $VERBOSE = $VERBOSE, nil -- cgit v1.2.3 From d6f2000a67cc63aa67414c75ce77de671824ec52 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 04:31:03 +1030 Subject: Wrangle the asset build into something that sounds more general --- actioncable/.gitignore | 2 +- actioncable/Rakefile | 35 ++++++---- .../app/assets/javascripts/action_cable.coffee.erb | 23 ++++++ .../javascripts/action_cable/connection.coffee | 81 ++++++++++++++++++++++ .../action_cable/connection_monitor.coffee | 79 +++++++++++++++++++++ .../javascripts/action_cable/consumer.coffee | 25 +++++++ .../app/assets/javascripts/action_cable/index.js | 1 - .../action_cable/source/connection.coffee | 81 ---------------------- .../action_cable/source/connection_monitor.coffee | 79 --------------------- .../action_cable/source/consumer.coffee | 25 ------- .../action_cable/source/index.coffee.erb | 23 ------ .../action_cable/source/subscription.coffee | 68 ------------------ .../action_cable/source/subscriptions.coffee | 64 ----------------- .../javascripts/action_cable/subscription.coffee | 68 ++++++++++++++++++ .../javascripts/action_cable/subscriptions.coffee | 64 +++++++++++++++++ 15 files changed, 363 insertions(+), 355 deletions(-) create mode 100644 actioncable/app/assets/javascripts/action_cable.coffee.erb create mode 100644 actioncable/app/assets/javascripts/action_cable/connection.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/consumer.coffee delete mode 100644 actioncable/app/assets/javascripts/action_cable/index.js delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/connection.coffee delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/consumer.coffee delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/subscription.coffee delete mode 100644 actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/subscription.coffee create mode 100644 actioncable/app/assets/javascripts/action_cable/subscriptions.coffee (limited to 'actioncable') diff --git a/actioncable/.gitignore b/actioncable/.gitignore index 8ded114548..0a04b29786 100644 --- a/actioncable/.gitignore +++ b/actioncable/.gitignore @@ -1,2 +1,2 @@ -/lib/assets/javascripts/action_cable.js +/lib/assets/compiled /tmp diff --git a/actioncable/Rakefile b/actioncable/Rakefile index 0a036e3e3d..1d77fc7067 100644 --- a/actioncable/Rakefile +++ b/actioncable/Rakefile @@ -8,6 +8,9 @@ dir = File.dirname(__FILE__) task :default => :test +task :package => "assets:compile" +task "package:clean" => "assets:clean" + Rake::TestTask.new do |t| t.libs << "test" t.test_files = Dir.glob("#{dir}/test/**/*_test.rb") @@ -17,32 +20,38 @@ Rake::TestTask.new do |t| end namespace :assets do + root_path = Pathname.new(dir) + destination_path = root_path.join("lib/assets/compiled") + desc "Compile dist/action_cable.js" task :compile do puts 'Compiling Action Cable assets...' - asset_mapping = { "source.js" => "action_cable.js" } + precompile_list = %w(action_cable.js) - root_path = Pathname.new(dir) - load_path = root_path.join("app/assets/javascripts/action_cable") - destination_path = root_path.join("lib/assets/javascripts") + environment = Sprockets::Environment.new + environment.gzip = false + Pathname.glob(root_path.join("app/assets/*/")) do |subdir| + environment.append_path subdir + end compile_path = root_path.join("tmp/sprockets") compile_path.rmtree if compile_path.exist? compile_path.mkpath - environment = Sprockets::Environment.new - environment.append_path(load_path) - manifest = Sprockets::Manifest.new(environment.index, compile_path) - manifest.compile(asset_mapping.keys) + manifest.compile(precompile_list) - asset_mapping.each do |logical_path, dist_path| - fingerprint_path = manifest.assets[logical_path] - FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(dist_path)) + destination_path.rmtree if destination_path.exist? + manifest.assets.each do |path, fingerprint_path| + destination_path.join(path).dirname.mkpath + FileUtils.cp(compile_path.join(fingerprint_path), destination_path.join(path)) end - puts '======' - puts 'Action Cable assets compiled successfully!' + puts 'Done' + end + + task :clean do + destination_path.rmtree if destination_path.exist? end end diff --git a/actioncable/app/assets/javascripts/action_cable.coffee.erb b/actioncable/app/assets/javascripts/action_cable.coffee.erb new file mode 100644 index 0000000000..18a48c0610 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable.coffee.erb @@ -0,0 +1,23 @@ +#= require_self +#= require ./action_cable/consumer + +@ActionCable = + INTERNAL: <%= ActionCable::INTERNAL.to_json %> + + createConsumer: (url = @getConfig("url")) -> + new ActionCable.Consumer @createWebSocketURL(url) + + getConfig: (name) -> + element = document.head.querySelector("meta[name='action-cable-#{name}']") + element?.getAttribute("content") + + createWebSocketURL: (url) -> + if url and not /^wss?:/i.test(url) + a = document.createElement("a") + a.href = url + # Fix populating Location properties in IE. Otherwise, protocol will be blank. + a.href = a.href + a.protocol = a.protocol.replace("http", "ws") + a.href + else + url diff --git a/actioncable/app/assets/javascripts/action_cable/connection.coffee b/actioncable/app/assets/javascripts/action_cable/connection.coffee new file mode 100644 index 0000000000..fbd7dbd35b --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/connection.coffee @@ -0,0 +1,81 @@ +# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. + +{message_types} = ActionCable.INTERNAL + +class ActionCable.Connection + @reopenDelay: 500 + + constructor: (@consumer) -> + @open() + + send: (data) -> + if @isOpen() + @webSocket.send(JSON.stringify(data)) + true + else + false + + open: => + if @webSocket and not @isState("closed") + throw new Error("Existing connection must be closed before opening") + else + @webSocket = new WebSocket(@consumer.url) + @installEventHandlers() + true + + close: -> + @webSocket?.close() + + reopen: -> + if @isState("closed") + @open() + else + try + @close() + finally + setTimeout(@open, @constructor.reopenDelay) + + isOpen: -> + @isState("open") + + # Private + + isState: (states...) -> + @getState() in states + + getState: -> + return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState + null + + installEventHandlers: -> + for eventName of @events + handler = @events[eventName].bind(this) + @webSocket["on#{eventName}"] = handler + return + + events: + message: (event) -> + {identifier, message, type} = JSON.parse(event.data) + + switch type + when message_types.confirmation + @consumer.subscriptions.notify(identifier, "connected") + when message_types.rejection + @consumer.subscriptions.reject(identifier) + else + @consumer.subscriptions.notify(identifier, "received", message) + + open: -> + @disconnected = false + @consumer.subscriptions.reload() + + close: -> + @disconnect() + + error: -> + @disconnect() + + disconnect: -> + return if @disconnected + @disconnected = true + @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee new file mode 100644 index 0000000000..99b9a1c6d5 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/connection_monitor.coffee @@ -0,0 +1,79 @@ +# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting +# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. +class ActionCable.ConnectionMonitor + @pollInterval: + min: 3 + max: 30 + + @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) + + identifier: ActionCable.INTERNAL.identifiers.ping + + constructor: (@consumer) -> + @consumer.subscriptions.add(this) + @start() + + connected: -> + @reset() + @pingedAt = now() + delete @disconnectedAt + + disconnected: -> + @disconnectedAt = now() + + received: -> + @pingedAt = now() + + reset: -> + @reconnectAttempts = 0 + + start: -> + @reset() + delete @stoppedAt + @startedAt = now() + @poll() + document.addEventListener("visibilitychange", @visibilityDidChange) + + stop: -> + @stoppedAt = now() + document.removeEventListener("visibilitychange", @visibilityDidChange) + + poll: -> + setTimeout => + unless @stoppedAt + @reconnectIfStale() + @poll() + , @getInterval() + + getInterval: -> + {min, max} = @constructor.pollInterval + interval = 5 * Math.log(@reconnectAttempts + 1) + clamp(interval, min, max) * 1000 + + reconnectIfStale: -> + if @connectionIsStale() + @reconnectAttempts++ + unless @disconnectedRecently() + @consumer.connection.reopen() + + connectionIsStale: -> + secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold + + disconnectedRecently: -> + @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold + + visibilityDidChange: => + if document.visibilityState is "visible" + setTimeout => + if @connectionIsStale() or not @consumer.connection.isOpen() + @consumer.connection.reopen() + , 200 + + now = -> + new Date().getTime() + + secondsSince = (time) -> + (now() - time) / 1000 + + clamp = (number, min, max) -> + Math.max(min, Math.min(max, number)) diff --git a/actioncable/app/assets/javascripts/action_cable/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/consumer.coffee new file mode 100644 index 0000000000..717c0641a9 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/consumer.coffee @@ -0,0 +1,25 @@ +#= require ./connection +#= require ./connection_monitor +#= require ./subscriptions +#= require ./subscription + +# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, +# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. +# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription +# method. +# +# The following example shows how this can be setup: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Consumer + constructor: (@url) -> + @subscriptions = new ActionCable.Subscriptions this + @connection = new ActionCable.Connection this + @connectionMonitor = new ActionCable.ConnectionMonitor this + + send: (data) -> + @connection.send(data) diff --git a/actioncable/app/assets/javascripts/action_cable/index.js b/actioncable/app/assets/javascripts/action_cable/index.js deleted file mode 100644 index e97870c3b0..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/index.js +++ /dev/null @@ -1 +0,0 @@ -//= require_tree ./source diff --git a/actioncable/app/assets/javascripts/action_cable/source/connection.coffee b/actioncable/app/assets/javascripts/action_cable/source/connection.coffee deleted file mode 100644 index fbd7dbd35b..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/connection.coffee +++ /dev/null @@ -1,81 +0,0 @@ -# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation. - -{message_types} = ActionCable.INTERNAL - -class ActionCable.Connection - @reopenDelay: 500 - - constructor: (@consumer) -> - @open() - - send: (data) -> - if @isOpen() - @webSocket.send(JSON.stringify(data)) - true - else - false - - open: => - if @webSocket and not @isState("closed") - throw new Error("Existing connection must be closed before opening") - else - @webSocket = new WebSocket(@consumer.url) - @installEventHandlers() - true - - close: -> - @webSocket?.close() - - reopen: -> - if @isState("closed") - @open() - else - try - @close() - finally - setTimeout(@open, @constructor.reopenDelay) - - isOpen: -> - @isState("open") - - # Private - - isState: (states...) -> - @getState() in states - - getState: -> - return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState - null - - installEventHandlers: -> - for eventName of @events - handler = @events[eventName].bind(this) - @webSocket["on#{eventName}"] = handler - return - - events: - message: (event) -> - {identifier, message, type} = JSON.parse(event.data) - - switch type - when message_types.confirmation - @consumer.subscriptions.notify(identifier, "connected") - when message_types.rejection - @consumer.subscriptions.reject(identifier) - else - @consumer.subscriptions.notify(identifier, "received", message) - - open: -> - @disconnected = false - @consumer.subscriptions.reload() - - close: -> - @disconnect() - - error: -> - @disconnect() - - disconnect: -> - return if @disconnected - @disconnected = true - @consumer.subscriptions.notifyAll("disconnected") diff --git a/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee b/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee deleted file mode 100644 index 99b9a1c6d5..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/connection_monitor.coffee +++ /dev/null @@ -1,79 +0,0 @@ -# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting -# revival reconnections if things go astray. Internal class, not intended for direct user manipulation. -class ActionCable.ConnectionMonitor - @pollInterval: - min: 3 - max: 30 - - @staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings) - - identifier: ActionCable.INTERNAL.identifiers.ping - - constructor: (@consumer) -> - @consumer.subscriptions.add(this) - @start() - - connected: -> - @reset() - @pingedAt = now() - delete @disconnectedAt - - disconnected: -> - @disconnectedAt = now() - - received: -> - @pingedAt = now() - - reset: -> - @reconnectAttempts = 0 - - start: -> - @reset() - delete @stoppedAt - @startedAt = now() - @poll() - document.addEventListener("visibilitychange", @visibilityDidChange) - - stop: -> - @stoppedAt = now() - document.removeEventListener("visibilitychange", @visibilityDidChange) - - poll: -> - setTimeout => - unless @stoppedAt - @reconnectIfStale() - @poll() - , @getInterval() - - getInterval: -> - {min, max} = @constructor.pollInterval - interval = 5 * Math.log(@reconnectAttempts + 1) - clamp(interval, min, max) * 1000 - - reconnectIfStale: -> - if @connectionIsStale() - @reconnectAttempts++ - unless @disconnectedRecently() - @consumer.connection.reopen() - - connectionIsStale: -> - secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold - - disconnectedRecently: -> - @disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold - - visibilityDidChange: => - if document.visibilityState is "visible" - setTimeout => - if @connectionIsStale() or not @consumer.connection.isOpen() - @consumer.connection.reopen() - , 200 - - now = -> - new Date().getTime() - - secondsSince = (time) -> - (now() - time) / 1000 - - clamp = (number, min, max) -> - Math.max(min, Math.min(max, number)) diff --git a/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee b/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee deleted file mode 100644 index 717c0641a9..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/consumer.coffee +++ /dev/null @@ -1,25 +0,0 @@ -#= require ./connection -#= require ./connection_monitor -#= require ./subscriptions -#= require ./subscription - -# The ActionCable.Consumer establishes the connection to a server-side Ruby Connection object. Once established, -# the ActionCable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates. -# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription -# method. -# -# The following example shows how this can be setup: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Consumer - constructor: (@url) -> - @subscriptions = new ActionCable.Subscriptions this - @connection = new ActionCable.Connection this - @connectionMonitor = new ActionCable.ConnectionMonitor this - - send: (data) -> - @connection.send(data) diff --git a/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb b/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb deleted file mode 100644 index f4615b7502..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/index.coffee.erb +++ /dev/null @@ -1,23 +0,0 @@ -#= require_self -#= require ./consumer - -@ActionCable = - INTERNAL: <%= ActionCable::INTERNAL.to_json %> - - createConsumer: (url = @getConfig("url")) -> - new ActionCable.Consumer @createWebSocketURL(url) - - getConfig: (name) -> - element = document.head.querySelector("meta[name='action-cable-#{name}']") - element?.getAttribute("content") - - createWebSocketURL: (url) -> - if url and not /^wss?:/i.test(url) - a = document.createElement("a") - a.href = url - # Fix populating Location properties in IE. Otherwise, protocol will be blank. - a.href = a.href - a.protocol = a.protocol.replace("http", "ws") - a.href - else - url diff --git a/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee deleted file mode 100644 index 339d676933..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/subscription.coffee +++ /dev/null @@ -1,68 +0,0 @@ -# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. -# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding -# Channel instance on the server side. -# -# An example demonstrates the basic functionality: -# -# App.appearance = App.cable.subscriptions.create "AppearanceChannel", -# connected: -> -# # Called once the subscription has been successfully completed -# -# appear: -> -# @perform 'appear', appearing_on: @appearingOn() -# -# away: -> -# @perform 'away' -# -# appearingOn: -> -# $('main').data 'appearing-on' -# -# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server -# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). -# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. -# -# This is how the server component would look: -# -# class AppearanceChannel < ApplicationActionCable::Channel -# def subscribed -# current_user.appear -# end -# -# def unsubscribed -# current_user.disappear -# end -# -# def appear(data) -# current_user.appear on: data['appearing_on'] -# end -# -# def away -# current_user.away -# end -# end -# -# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. -# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. -class ActionCable.Subscription - constructor: (@subscriptions, params = {}, mixin) -> - @identifier = JSON.stringify(params) - extend(this, mixin) - @subscriptions.add(this) - @consumer = @subscriptions.consumer - - # Perform a channel action with the optional data passed as an attribute - perform: (action, data = {}) -> - data.action = action - @send(data) - - send: (data) -> - @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) - - unsubscribe: -> - @subscriptions.remove(this) - - extend = (object, properties) -> - if properties? - for key, value of properties - object[key] = value - object diff --git a/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee deleted file mode 100644 index ae041ffa2b..0000000000 --- a/actioncable/app/assets/javascripts/action_cable/source/subscriptions.coffee +++ /dev/null @@ -1,64 +0,0 @@ -# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user -# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: -# -# @App = {} -# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" -# App.appearance = App.cable.subscriptions.create "AppearanceChannel" -# -# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. -class ActionCable.Subscriptions - constructor: (@consumer) -> - @subscriptions = [] - - create: (channelName, mixin) -> - channel = channelName - params = if typeof channel is "object" then channel else {channel} - new ActionCable.Subscription this, params, mixin - - # Private - - add: (subscription) -> - @subscriptions.push(subscription) - @notify(subscription, "initialized") - @sendCommand(subscription, "subscribe") - - remove: (subscription) -> - @forget(subscription) - - unless @findAll(subscription.identifier).length - @sendCommand(subscription, "unsubscribe") - - reject: (identifier) -> - for subscription in @findAll(identifier) - @forget(subscription) - @notify(subscription, "rejected") - - forget: (subscription) -> - @subscriptions = (s for s in @subscriptions when s isnt subscription) - - findAll: (identifier) -> - s for s in @subscriptions when s.identifier is identifier - - reload: -> - for subscription in @subscriptions - @sendCommand(subscription, "subscribe") - - notifyAll: (callbackName, args...) -> - for subscription in @subscriptions - @notify(subscription, callbackName, args...) - - notify: (subscription, callbackName, args...) -> - if typeof subscription is "string" - subscriptions = @findAll(subscription) - else - subscriptions = [subscription] - - for subscription in subscriptions - subscription[callbackName]?(args...) - - sendCommand: (subscription, command) -> - {identifier} = subscription - if identifier is ActionCable.INTERNAL.identifiers.ping - @consumer.connection.isOpen() - else - @consumer.send({command, identifier}) diff --git a/actioncable/app/assets/javascripts/action_cable/subscription.coffee b/actioncable/app/assets/javascripts/action_cable/subscription.coffee new file mode 100644 index 0000000000..339d676933 --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/subscription.coffee @@ -0,0 +1,68 @@ +# A new subscription is created through the ActionCable.Subscriptions instance available on the consumer. +# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding +# Channel instance on the server side. +# +# An example demonstrates the basic functionality: +# +# App.appearance = App.cable.subscriptions.create "AppearanceChannel", +# connected: -> +# # Called once the subscription has been successfully completed +# +# appear: -> +# @perform 'appear', appearing_on: @appearingOn() +# +# away: -> +# @perform 'away' +# +# appearingOn: -> +# $('main').data 'appearing-on' +# +# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server +# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away). +# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter. +# +# This is how the server component would look: +# +# class AppearanceChannel < ApplicationActionCable::Channel +# def subscribed +# current_user.appear +# end +# +# def unsubscribed +# current_user.disappear +# end +# +# def appear(data) +# current_user.appear on: data['appearing_on'] +# end +# +# def away +# current_user.away +# end +# end +# +# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name. +# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method. +class ActionCable.Subscription + constructor: (@subscriptions, params = {}, mixin) -> + @identifier = JSON.stringify(params) + extend(this, mixin) + @subscriptions.add(this) + @consumer = @subscriptions.consumer + + # Perform a channel action with the optional data passed as an attribute + perform: (action, data = {}) -> + data.action = action + @send(data) + + send: (data) -> + @consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data)) + + unsubscribe: -> + @subscriptions.remove(this) + + extend = (object, properties) -> + if properties? + for key, value of properties + object[key] = value + object diff --git a/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee new file mode 100644 index 0000000000..ae041ffa2b --- /dev/null +++ b/actioncable/app/assets/javascripts/action_cable/subscriptions.coffee @@ -0,0 +1,64 @@ +# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user +# us ActionCable.Subscriptions#create, and it should be called through the consumer like so: +# +# @App = {} +# App.cable = ActionCable.createConsumer "ws://example.com/accounts/1" +# App.appearance = App.cable.subscriptions.create "AppearanceChannel" +# +# For more details on how you'd configure an actual channel subscription, see ActionCable.Subscription. +class ActionCable.Subscriptions + constructor: (@consumer) -> + @subscriptions = [] + + create: (channelName, mixin) -> + channel = channelName + params = if typeof channel is "object" then channel else {channel} + new ActionCable.Subscription this, params, mixin + + # Private + + add: (subscription) -> + @subscriptions.push(subscription) + @notify(subscription, "initialized") + @sendCommand(subscription, "subscribe") + + remove: (subscription) -> + @forget(subscription) + + unless @findAll(subscription.identifier).length + @sendCommand(subscription, "unsubscribe") + + reject: (identifier) -> + for subscription in @findAll(identifier) + @forget(subscription) + @notify(subscription, "rejected") + + forget: (subscription) -> + @subscriptions = (s for s in @subscriptions when s isnt subscription) + + findAll: (identifier) -> + s for s in @subscriptions when s.identifier is identifier + + reload: -> + for subscription in @subscriptions + @sendCommand(subscription, "subscribe") + + notifyAll: (callbackName, args...) -> + for subscription in @subscriptions + @notify(subscription, callbackName, args...) + + notify: (subscription, callbackName, args...) -> + if typeof subscription is "string" + subscriptions = @findAll(subscription) + else + subscriptions = [subscription] + + for subscription in subscriptions + subscription[callbackName]?(args...) + + sendCommand: (subscription, command) -> + {identifier} = subscription + if identifier is ActionCable.INTERNAL.identifiers.ping + @consumer.connection.isOpen() + else + @consumer.send({command, identifier}) -- cgit v1.2.3 From 93abf58787396661230f31c7a2f58c18f30dbec9 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 05:14:02 +1030 Subject: Drop the runtime dependency on coffee-rails --- actioncable/actioncable.gemspec | 1 - 1 file changed, 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 0976895ef7..8d35d819cf 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -20,7 +20,6 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version - s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' -- cgit v1.2.3 From ddd84f6193ee44cdf33db60db469463940ea76cd Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Sat, 30 Jan 2016 21:39:05 -0500 Subject: Remove unused method --- actioncable/test/client_test.rb | 9 --------- 1 file changed, 9 deletions(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index d7eecfa322..28086337c3 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -145,15 +145,6 @@ class ClientTest < ActionCable::TestCase @ws.close @closed.wait(WAIT_WHEN_EXPECTING_EVENT) end - - def close! - sock = BasicSocket.for_fd(@ws.instance_variable_get(:@stream).detach) - - # Force a TCP reset - sock.setsockopt(Socket::SOL_SOCKET, Socket::SO_LINGER, [1, 0].pack('ii')) - - sock.close - end end def faye_client(port) -- cgit v1.2.3 From 55e33667a6da92bea28eef8c3a435f45dbf6597b Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 13:22:47 +1030 Subject: Remove development dependencies from actioncable.gemspec None of the other components use them, so we should be consistent. --- actioncable/actioncable.gemspec | 9 --------- 1 file changed, 9 deletions(-) (limited to 'actioncable') diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 8d35d819cf..c65ff7871f 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -22,13 +22,4 @@ Gem::Specification.new do |s| s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' - - s.add_development_dependency 'coffee-script', '~> 2.4.1' - s.add_development_dependency 'coffee-script-source', '~> 1.10.0' - s.add_development_dependency 'em-hiredis', '~> 0.3.0' - s.add_development_dependency 'mocha' - s.add_development_dependency 'pg' - s.add_development_dependency 'puma' - s.add_development_dependency 'redis', '~> 3.0' - s.add_development_dependency 'sprockets', '~> 3.5.2' end -- cgit v1.2.3 From 49f6ce63f33b7817bcbd0cdf5f8881b63f40d9c9 Mon Sep 17 00:00:00 2001 From: Sean Griffin Date: Mon, 1 Feb 2016 14:27:38 -0700 Subject: Preparing for Rails 5.0.0.beta2 --- actioncable/CHANGELOG.md | 5 +++++ actioncable/lib/action_cable/gem_version.rb | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index a33b7b6b4b..5c968a48fc 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,8 @@ +## Rails 5.0.0.beta2 (February 01, 2016) ## + +* No changes. + + * Create notion of an `ActionCable::SubscriptionAdapter`. Separate out Redis functionality into `ActionCable::SubscriptionAdapter::Redis`, and add a diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index c652fb91ae..a71603e61a 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -8,7 +8,7 @@ module ActionCable MAJOR = 5 MINOR = 0 TINY = 0 - PRE = "beta1.1" + PRE = "beta2" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end -- cgit v1.2.3 From 60b040e362086fa11f86d35938d515145241174e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20Mendon=C3=A7a=20Fran=C3=A7a?= Date: Mon, 1 Feb 2016 19:57:31 -0200 Subject: Add some Action Cable CHANGELOG entries And improve changelongs. [ci skip] --- actioncable/CHANGELOG.md | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index 5c968a48fc..e671a07563 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,7 +1,16 @@ ## Rails 5.0.0.beta2 (February 01, 2016) ## -* No changes. +* Support PostgreSQL pubsub adapter. + *Jon Moss* + +* Remove EventMachine dependency. + + *Matthew Draper* + +* Remove Celluloid dependency. + + *Mike Perham* * Create notion of an `ActionCable::SubscriptionAdapter`. Separate out Redis functionality into -- cgit v1.2.3 From 830543738507f49444956b3b6ae897f4638b2523 Mon Sep 17 00:00:00 2001 From: Nick Quaranto Date: Tue, 2 Feb 2016 11:16:41 -0500 Subject: [ci skip] Several ActionCable documentation updates: * Properly indent code sample in ActionCable::Channel::Streams * Add a doc comment for #stop_all_streams * Reformat + add blocks around code references in ActionCable::Base docs * Clarify and a little better grammar on ActionCable::RemoteConnections * Correct indentation and clean up ActionCable::Server::Broadcasting code sample --- actioncable/lib/action_cable/channel/base.rb | 33 ++++++++++++++-------- actioncable/lib/action_cable/channel/streams.rb | 26 +++++++++-------- actioncable/lib/action_cable/remote_connections.rb | 12 ++++---- .../lib/action_cable/server/broadcasting.rb | 22 +++++++-------- 4 files changed, 53 insertions(+), 40 deletions(-) (limited to 'actioncable') diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 88cdc1cab1..874ebe2e71 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -32,8 +32,11 @@ module ActionCable # # == Action processing # - # Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's a remote-procedure call model. You can - # declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client. + # Unlike subclasses of ActionController::Base, channels do not follow a REST + # constraint form for their actions. Instead, ActionCable operates through a + # remote-procedure call model. You can declare any public method on the + # channel (optionally taking a data argument), and this method is + # automatically exposed as callable to the client. # # Example: # @@ -60,18 +63,22 @@ module ActionCable # end # end # - # In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away - # are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then - # uses as part of its model call. #away does not, it's simply a trigger action. + # In this example, subscribed/unsubscribed are not callable methods, as they + # were already declared in ActionCable::Channel::Base, but #appear + # and #away are. #generate_connection_token is also not + # callable as it's a private method. You'll see that appear accepts a data + # parameter, which it then uses as part of its model call. #away + # does not, since it's simply a trigger action. # - # Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection. - # All such identifiers will automatically create a delegation method of the same name on the channel instance. + # Also note that in this example, current_user is available because + # it was marked as an identifying attribute on the connection. All such + # identifiers will automatically create a delegation method of the same name + # on the channel instance. # # == Rejecting subscription requests # - # A channel can reject a subscription request in the #subscribed callback by invoking #reject! - # - # Example: + # A channel can reject a subscription request in the #subscribed callback by + # invoking the #reject method: # # class ChatChannel < ApplicationCable::Channel # def subscribed @@ -80,8 +87,10 @@ module ActionCable # end # end # - # In this example, the subscription will be rejected if the current_user does not have access to the chat room. - # On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request. + # In this example, the subscription will be rejected if the + # current_user does not have access to the chat room. On the + # client-side, the Channel#rejected callback will get invoked when + # the server rejects the subscription request. class Base include Callbacks include PeriodicTimers diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index a26373e387..3158f30814 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -41,22 +41,23 @@ module ActionCable # Example below shows how you can use this to provide performance introspection in the process: # # class ChatChannel < ApplicationCable::Channel - # def subscribed - # @room = Chat::Room[params[:room_number]] + # def subscribed + # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) + # stream_for @room, -> (encoded_message) do + # message = ActiveSupport::JSON.decode(encoded_message) # - # if message['originated_at'].present? - # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) + # if message['originated_at'].present? + # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # - # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing - # logger.info "Message took #{elapsed_time}s to arrive" - # end + # ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing + # logger.info "Message took #{elapsed_time}s to arrive" + # end # - # transmit message - # end - # end + # transmit message + # end + # end + # end # # You can stop streaming from all broadcasts by calling #stop_all_streams. module Streams @@ -90,6 +91,7 @@ module ActionCable stream_from(broadcasting_for([ channel_name, model ]), callback) end + # Unsubscribes all streams associated with this channel from the pubsub queue. def stop_all_streams streams.each do |broadcasting, callback| pubsub.unsubscribe broadcasting, callback diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index aa2fc95d2f..7ec121308a 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -1,6 +1,7 @@ module ActionCable - # If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by - # searching the identifier declared on the connection. Example: + # If you need to disconnect a given connection, you can go through the + # RemoteConnections. You can find the connections you're looking for by + # searching for the identifier declared on the connection. For example: # # module ApplicationCable # class Connection < ActionCable::Connection::Base @@ -11,8 +12,9 @@ module ActionCable # # ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect # - # That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses - # the internal channel that all these servers are subscribed to). + # This will disconnect all the connections established for + # User.find(1) across all servers running on all machines, because + # it uses the internal channel that all these servers are subscribed to. class RemoteConnections attr_reader :server @@ -25,7 +27,7 @@ module ActionCable end private - # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). + # Represents a single remote connection found via ActionCable.server.remote_connections.where(*). # Exists for the solely for the purpose of calling #disconnect on that connection. class RemoteConnection class InvalidIdentifiersError < StandardError; end diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 4a26ed9269..7e8aef45f4 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -4,19 +4,19 @@ module ActionCable # broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example: # # class WebNotificationsChannel < ApplicationCable::Channel - # def subscribed - # stream_from "web_notifications_#{current_user.id}" - # end - # end + # def subscribed + # stream_from "web_notifications_#{current_user.id}" + # end + # end # - # # Somewhere in your app this is called, perhaps from a NewCommentJob - # ActionCable.server.broadcast \ - # "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' } + # # Somewhere in your app this is called, perhaps from a NewCommentJob + # ActionCable.server.broadcast \ + # "web_notifications_1", { title: "New things!", body: "All that's fit for print" } # - # # Client-side coffescript, which assumes you've already requested the right to send web notifications - # App.cable.subscriptions.create "WebNotificationsChannel", - # received: (data) -> - # new Notification data['title'], body: data['body'] + # # Client-side CoffeeScript, which assumes you've already requested the right to send web notifications + # App.cable.subscriptions.create "WebNotificationsChannel", + # received: (data) -> + # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named broadcasting. It'll automatically be JSON encoded. def broadcast(broadcasting, message) -- cgit v1.2.3 From 3366a3ad780b140805890b2d3b2d613acec441e4 Mon Sep 17 00:00:00 2001 From: "Hongli Lai (Phusion)" Date: Wed, 3 Feb 2016 18:09:58 +0100 Subject: Document the fact that Action Cable does not require a multi-threaded app server [ci skip] --- actioncable/README.md | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index ac57532b62..6e74551483 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -448,8 +448,17 @@ as long as you haven’t committed any thread-safety sins. But this also means that Action Cable needs to run in its own server process. So you'll have one set of server processes for your normal web work, and another -set of server processes for the Action Cable. The former can be single-threaded, -like Unicorn, but the latter must be multi-threaded, like Puma. +set of server processes for the Action Cable. + +The Action Cable server does _not_ need to be a multi-threaded application server. +This is because Action Cable uses the [Rack socket hijacking API](http://old.blog.phusion.nl/2013/01/23/the-new-rack-socket-hijacking-api/) +to take over control of connections from the application server. Action Cable +then manages connections internally, in a multithreaded manner, regardless of +whether the application server is multi-threaded or not. So Action Cable works +with all the popular application servers -- Unicorn, Puma and Passenger. + +Action Cable does not work with WEBrick, because WEBrick does not support the +Rack socket hijacking API. ## License -- cgit v1.2.3 From 5e5fd246d5852b1c49dfdb8e635fb2e2c6ae8e55 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 4 Feb 2016 12:10:35 +0100 Subject: Allow for non-standard redis connectors --- actioncable/CHANGELOG.md | 7 +++++++ .../lib/action_cable/subscription_adapter/evented_redis.rb | 12 ++++++++++-- actioncable/lib/action_cable/subscription_adapter/redis.rb | 6 +++++- 3 files changed, 22 insertions(+), 3 deletions(-) (limited to 'actioncable') diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index e671a07563..bfc229d795 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,10 @@ +* Added ActionCable::SubscriptionAdapter::EventedRedis.em_redis_connector/redis_connector and + ActionCable::SubscriptionAdapter::Redis.redis_connector factory methods for redis connections, + so you can overwrite with your own initializers. This is used when you want to use different-than-standard Redis adapters, + like for Makara distributed Redis. + + *DHH* + ## Rails 5.0.0.beta2 (February 01, 2016) ## * Support PostgreSQL pubsub adapter. diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index d697548cbd..af04a58c70 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -13,6 +13,14 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new + # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil @@ -41,7 +49,7 @@ module ActionCable def redis_connection_for_subscriptions ensure_reactor_running @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." end @@ -51,7 +59,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 7076383efe..ba4934a264 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,6 +6,10 @@ require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @listener = nil @@ -39,7 +43,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end -- cgit v1.2.3 From cdb6f2eb9e9e60d0c6aa6ddcb854f74595fa5f06 Mon Sep 17 00:00:00 2001 From: "yuuji.yaginuma" Date: Fri, 5 Feb 2016 21:52:59 +0900 Subject: =?UTF-8?q?don=E2=80=99t=20explicitly=20mention=20EventMachine=20[?= =?UTF-8?q?ci=20skip]?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow up to 6accef4e11b0c793e1c085536b5ed27f32b6a0c3 --- actioncable/lib/rails/generators/channel/templates/channel.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/lib/rails/generators/channel/templates/channel.rb b/actioncable/lib/rails/generators/channel/templates/channel.rb index 6cf04ee61f..7bff3341c1 100644 --- a/actioncable/lib/rails/generators/channel/templates/channel.rb +++ b/actioncable/lib/rails/generators/channel/templates/channel.rb @@ -1,4 +1,4 @@ -# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading. +# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading. <% module_namespacing do -%> class <%= class_name %>Channel < ApplicationCable::Channel def subscribed -- cgit v1.2.3 From 7fe32d28a8e73bfa826773c5a8777a316126cd38 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Fri, 5 Feb 2016 11:56:16 +0100 Subject: Cant run on an out-of-the-box OSX installation without running out of TOO MANY FILES OPEN --- actioncable/test/client_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 199d2b90a3..4ade9832e0 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -187,7 +187,7 @@ class ClientTest < ActionCable::TestCase def test_many_clients with_puma_server do |port| - clients = 200.times.map { faye_client(port) } + clients = 100.times.map { faye_client(port) } clients.map {|c| Concurrent::Future.execute { c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') -- cgit v1.2.3 From 0ec48ab0cbc4ebb739101fffcbee98e7347aefda Mon Sep 17 00:00:00 2001 From: Daniel Fox Date: Sun, 7 Feb 2016 17:02:47 -0600 Subject: config examples for ActionCable now use Rails.application.config.action_cable Some existing examples used ActionCable.server.config but for configuring allowed_request_origins that is overridden in development mode. The correct place to set that is Rails.application.config.action_cable which the ActionCable initializer loads from. I thought the other two examples should be changed as well just in case a default value that would override a configured value is introduced for either log_tags or disable_request_forgery_protection in the future. --- actioncable/README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index 6e74551483..3316f88a23 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -324,7 +324,7 @@ Rails.application.paths.add "config/cable", with: "somewhere/else/cable.yml" Action Cable will only accept requests from specified origins, which are passed to the server config as an array. The origins can be instances of strings or regular expressions, against which a check for match will be performed. ```ruby -ActionCable.server.config.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/] +Rails.application.config.action_cable.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/] ``` When running in the development environment, this defaults to "http://localhost:3000". @@ -332,7 +332,7 @@ When running in the development environment, this defaults to "http://localhost: To disable and allow requests from any origin: ```ruby -ActionCable.server.config.disable_request_forgery_protection = true +Rails.application.config.action_cable.disable_request_forgery_protection = true ``` ### Consumer Configuration @@ -374,7 +374,7 @@ App.cable = ActionCable.createConsumer() The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp: ```ruby -ActionCable.server.config.log_tags = [ +Rails.application.config.action_cable.log_tags = [ -> request { request.env['bc.account_id'] || "no-account" }, :action_cable, -> request { request.uuid } -- cgit v1.2.3 From 3546b3f0d406499827e40c18b88a05e624aee6ed Mon Sep 17 00:00:00 2001 From: "yuuji.yaginuma" Date: Mon, 8 Feb 2016 08:49:49 +0900 Subject: remove `faye-websocket` dependency from README [ci skip] `faye-websocket` gem is no longer used from 322dca293b3716ccaa09e7e82046e539b0d2ffda. --- actioncable/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index 6e74551483..a1261f0820 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -436,7 +436,7 @@ messages back and forth over the WebSocket cable connection. This dependency may be alleviated in the future, but for the moment that's what it is. So be sure to have Redis installed and running. -The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby). +The Ruby side of things is built on top of [websocket-driver](https://github.com/faye/websocket-driver-ruby), [nio4r](https://github.com/celluloid/nio4r), and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby). ## Deployment -- cgit v1.2.3 From 3f184cab489618aed0b2969ad7893ee3bdd7a843 Mon Sep 17 00:00:00 2001 From: Ryuta Kamizono Date: Tue, 9 Feb 2016 09:47:47 +0900 Subject: Fix typo [ci skip] --- actioncable/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index 7f7a830e6c..5029f583cb 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -17,7 +17,7 @@ The client of a WebSocket connection is called the consumer. Each consumer can in turn subscribe to multiple cable channels. Each channel encapsulates a logical unit of work, similar to what a controller does in a regular MVC setup. For example, -you could have a `ChatChannel` and a `AppearancesChannel`, and a consumer could be subscribed to either +you could have a `ChatChannel` and an `AppearancesChannel`, and a consumer could be subscribed to either or to both of these channels. At the very least, a consumer should be subscribed to one channel. When the consumer is subscribed to a channel, they act as a subscriber. The connection between -- cgit v1.2.3 From f2fcd3a000f70cc520b9267e82184ab87fea7f80 Mon Sep 17 00:00:00 2001 From: Mawueli Kofi Adzoe Date: Tue, 9 Feb 2016 22:35:40 -0700 Subject: Fix tiny grammar. --- actioncable/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'actioncable') diff --git a/actioncable/README.md b/actioncable/README.md index 5029f583cb..3ed8a23a29 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -188,7 +188,7 @@ can be reached as remote procedure calls via a subscription's `perform` method. The appearance example was all about exposing server functionality to client-side invocation over the WebSocket connection. But the great thing about WebSockets is that it's a two-way street. So now let's show an example where the server invokes -action on the client. +an action on the client. This is a web notification channel that allows you to trigger client-side web notifications when you broadcast to the right streams: -- cgit v1.2.3