From 68a9060d02b1eb35c12843c0f1653809b776b35b Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Wed, 20 Jan 2016 15:54:20 +1030 Subject: Using a hacked faye-websocket, drop EventMachine --- Gemfile | 2 ++ Gemfile.lock | 16 +++++++++---- actioncable/actioncable.gemspec | 1 - .../lib/action_cable/channel/periodic_timers.rb | 4 ++-- actioncable/lib/action_cable/channel/streams.rb | 2 +- .../action_cable/connection/internal_channel.rb | 4 ++-- actioncable/lib/action_cable/process/logging.rb | 9 ++++---- actioncable/lib/action_cable/server.rb | 4 ---- actioncable/lib/action_cable/server/connections.rb | 8 +++---- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../subscription_adapter/postgresql.rb | 4 ++-- .../lib/action_cable/subscription_adapter/redis.rb | 16 +++++++++++++ actioncable/test/channel/periodic_timers_test.rb | 2 +- actioncable/test/channel/stream_test.rb | 22 +++++++----------- actioncable/test/connection/base_test.rb | 13 ++++++----- actioncable/test/subscription_adapter/common.rb | 2 -- actioncable/test/test_helper.rb | 26 ++++++---------------- 17 files changed, 70 insertions(+), 69 deletions(-) diff --git a/Gemfile b/Gemfile index b69c05025f..c3dd4436da 100644 --- a/Gemfile +++ b/Gemfile @@ -66,6 +66,8 @@ group :cable do gem 'em-hiredis', require: false gem 'redis', require: false + + gem 'faye-websocket', github: "matthewd/faye-websocket-ruby", branch: "no-em-concept", require: false end # Add your own local bundler stuff. diff --git a/Gemfile.lock b/Gemfile.lock index 0789c59905..3467c23d6f 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -19,6 +19,16 @@ GIT qu (= 0.2.0) redis-namespace +GIT + remote: git://github.com/matthewd/faye-websocket-ruby.git + revision: f608bb57844b91b397817b06ab6bf744324010ab + branch: no-em-concept + specs: + faye-websocket (0.10.2) + concurrent-ruby (~> 1.0) + nio4r (~> 1.2) + websocket-driver (>= 0.5.1) + GIT remote: git://github.com/sass/sass.git revision: bce9509f396225d721501ea1070a6871b708abb1 @@ -32,7 +42,6 @@ PATH actioncable (5.0.0.beta1) actionpack (= 5.0.0.beta1) coffee-rails (~> 4.1.0) - eventmachine (~> 1.0) faye-websocket (~> 0.10.0) websocket-driver (~> 0.6.1) actionmailer (5.0.0.beta1) @@ -145,9 +154,6 @@ GEM erubis (2.7.0) eventmachine (1.0.9.1) execjs (2.6.0) - faye-websocket (0.10.2) - eventmachine (>= 0.12.0) - websocket-driver (>= 0.5.1) ffi (1.9.10) ffi (1.9.10-x64-mingw32) ffi (1.9.10-x86-mingw32) @@ -185,6 +191,7 @@ GEM mysql2 (0.4.2) mysql2 (0.4.2-x64-mingw32) mysql2 (0.4.2-x86-mingw32) + nio4r (1.2.0) nokogiri (1.6.7.1) mini_portile2 (~> 2.0.0.rc2) nokogiri (1.6.7.1-x64-mingw32) @@ -307,6 +314,7 @@ DEPENDENCIES delayed_job delayed_job_active_record em-hiredis + faye-websocket! jquery-rails json kindlerb (= 0.1.1) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index a36acc8f6f..847fcc71c3 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,7 +21,6 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'eventmachine', '~> 1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 7f0fb37afc..56597d02d7 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -27,14 +27,14 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| - active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do + active_periodic_timers << Concurrent::TimerTask.new(execution_interval: options[:every]) do connection.worker_pool.async_run_periodic_timer(self, callback) end end end def stop_periodic_timers - active_periodic_timers.each { |timer| timer.cancel } + active_periodic_timers.each { |timer| timer.shutdown } end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index e2876ef6fa..a26373e387 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -75,7 +75,7 @@ module ActionCable callback ||= default_stream_callback(broadcasting) streams << [ broadcasting, callback ] - EM.next_tick do + Concurrent.global_io_executor.post do pubsub.subscribe(broadcasting, callback, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 54ed7672d2..27826792b3 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, callback) } + Concurrent.global_io_executor.post { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| Concurrent.global_io_executor.post { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb index dce637b3ca..ebb8990891 100644 --- a/actioncable/lib/action_cable/process/logging.rb +++ b/actioncable/lib/action_cable/process/logging.rb @@ -1,7 +1,8 @@ require 'action_cable/server' -require 'eventmachine' -EM.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") +if defined?(::EventMachine) + EventMachine.error_handler do |e| + puts "Error raised inside the event loop: #{e.message}" + puts e.backtrace.join("\n") + end end diff --git a/actioncable/lib/action_cable/server.rb b/actioncable/lib/action_cable/server.rb index a2a89d5f1e..bd6a3826a3 100644 --- a/actioncable/lib/action_cable/server.rb +++ b/actioncable/lib/action_cable/server.rb @@ -1,7 +1,3 @@ -require 'eventmachine' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module Server extend ActiveSupport::Autoload diff --git a/actioncable/lib/action_cable/server/connections.rb b/actioncable/lib/action_cable/server/connections.rb index 47dcea8c20..8671dd5ebd 100644 --- a/actioncable/lib/action_cable/server/connections.rb +++ b/actioncable/lib/action_cable/server/connections.rb @@ -22,11 +22,9 @@ module ActionCable # then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically # disconnect. def setup_heartbeat_timer - EM.next_tick do - @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do - EM.next_tick { connections.map(&:beat) } - end - end + @heartbeat_timer ||= Concurrent::TimerTask.new(execution_interval: BEAT_INTERVAL) do + Concurrent.global_io_executor.post { connections.map(&:beat) } + end.tap(&:execute) end def open_connections_statistics diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..c88b03947a 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..3ce1bbed68 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + Concurrent.global_io_executor << callback if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..a035e3988d 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,11 +1,18 @@ +require 'thread' + gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + @@mutex = Mutex.new + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -27,6 +34,7 @@ module ActionCable private def redis_connection_for_subscriptions + ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -37,6 +45,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end end end end diff --git a/actioncable/test/channel/periodic_timers_test.rb b/actioncable/test/channel/periodic_timers_test.rb index 1590a12f09..64f0247cd6 100644 --- a/actioncable/test/channel/periodic_timers_test.rb +++ b/actioncable/test/channel/periodic_timers_test.rb @@ -31,7 +31,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase end test "timer start and stop" do - EventMachine::PeriodicTimer.expects(:new).times(2).returns(true) + Concurrent::TimerTask.expects(:new).times(2).returns(true) channel = ChatChannel.new @connection, "{id: 1}", { id: 1 } channel.expects(:stop_periodic_timers).once diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 3fa2b291b7..947efd96d4 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -31,9 +31,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_for" do run_in_eventmachine do connection = TestConnection.new - EM.next_tick do - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - end + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "" channel.stream_for Room.new(1) @@ -41,39 +39,35 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end test "stream_from subscription confirmation" do - EM.run do + run_in_eventmachine do connection = TestConnection.new ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission - EM::Timer.new(0.1) do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + wait_for_async - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) - EM.run_deferred_callbacks - EM.stop - end + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" end end test "subscription confirmation should only be sent out once" do - EM.run do + run_in_eventmachine do connection = TestConnection.new channel = ChatChannel.new connection, "test_channel" channel.send_confirmation channel.send_confirmation - EM.run_deferred_callbacks + wait_for_async expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" assert_equal 1, connection.transmissions.size - EM.stop end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index 182562db82..579578d0a7 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -37,6 +37,8 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process assert connection.websocket.possible? + + wait_for_async assert connection.websocket.alive? end end @@ -58,11 +60,10 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) - # Allow EM to run on_open callback - EM.next_tick do - assert_equal [ connection ], @server.connections - assert connection.connected - end + wait_for_async + + assert_equal [ connection ], @server.connections + assert connection.connected end end @@ -72,7 +73,7 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase connection.process # Setup the connection - EventMachine.stubs(:add_periodic_timer).returns(true) + Concurrent::TimerTask.stubs(:new).returns(true) connection.send :on_open assert connection.connected diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index d4a13be889..5efc6d6b81 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -24,8 +24,6 @@ module CommonSubscriptionAdapterTest # and now the "real" setup for our test: - spawn_eventmachine - server.config.cable = cable_config.with_indifferent_access adapter_klass = server.config.pubsub_adapter diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 6636ce078b..8ddbd4e764 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -13,28 +13,16 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -require 'faye/websocket' -class << Faye::WebSocket - remove_method :ensure_reactor_running - - # We don't want Faye to start the EM reactor in tests because it makes testing much harder. - # We want to be able to start and stop EM loop in tests to make things simpler. - def ensure_reactor_running - # no-op - end -end - class ActionCable::TestCase < ActiveSupport::TestCase - def run_in_eventmachine - EM.run do - yield - - EM.run_deferred_callbacks - EM.stop + def wait_for_async + e = Concurrent.global_io_executor + until e.completed_task_count == e.scheduled_task_count + sleep 0.1 end end - def spawn_eventmachine - Thread.new { EventMachine.run } unless EventMachine.reactor_running? + def run_in_eventmachine + yield + wait_for_async end end -- cgit v1.2.3 From 322dca293b3716ccaa09e7e82046e539b0d2ffda Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 21 Jan 2016 14:59:11 +1030 Subject: Import the relevant portions of faye-websocket (as adapted to use concurrent-ruby / nio4r instead of eventmachine) --- Gemfile | 2 - Gemfile.lock | 13 +- actioncable/actioncable.gemspec | 2 +- actioncable/lib/action_cable/connection.rb | 5 +- actioncable/lib/action_cable/connection/base.rb | 32 +++-- .../lib/action_cable/connection/client_socket.rb | 152 +++++++++++++++++++++ actioncable/lib/action_cable/connection/stream.rb | 59 ++++++++ .../action_cable/connection/stream_event_loop.rb | 68 +++++++++ .../lib/action_cable/connection/web_socket.rb | 22 +-- actioncable/lib/action_cable/server/base.rb | 4 + actioncable/test/connection/base_test.rb | 6 +- actioncable/test/connection/identifier_test.rb | 4 +- .../test/connection/multiple_identifiers_test.rb | 4 +- actioncable/test/stubs/test_server.rb | 3 +- 14 files changed, 332 insertions(+), 44 deletions(-) create mode 100644 actioncable/lib/action_cable/connection/client_socket.rb create mode 100644 actioncable/lib/action_cable/connection/stream.rb create mode 100644 actioncable/lib/action_cable/connection/stream_event_loop.rb diff --git a/Gemfile b/Gemfile index c3dd4436da..b69c05025f 100644 --- a/Gemfile +++ b/Gemfile @@ -66,8 +66,6 @@ group :cable do gem 'em-hiredis', require: false gem 'redis', require: false - - gem 'faye-websocket', github: "matthewd/faye-websocket-ruby", branch: "no-em-concept", require: false end # Add your own local bundler stuff. diff --git a/Gemfile.lock b/Gemfile.lock index 3467c23d6f..61b90d305c 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -19,16 +19,6 @@ GIT qu (= 0.2.0) redis-namespace -GIT - remote: git://github.com/matthewd/faye-websocket-ruby.git - revision: f608bb57844b91b397817b06ab6bf744324010ab - branch: no-em-concept - specs: - faye-websocket (0.10.2) - concurrent-ruby (~> 1.0) - nio4r (~> 1.2) - websocket-driver (>= 0.5.1) - GIT remote: git://github.com/sass/sass.git revision: bce9509f396225d721501ea1070a6871b708abb1 @@ -42,7 +32,7 @@ PATH actioncable (5.0.0.beta1) actionpack (= 5.0.0.beta1) coffee-rails (~> 4.1.0) - faye-websocket (~> 0.10.0) + nio4r (~> 1.2) websocket-driver (~> 0.6.1) actionmailer (5.0.0.beta1) actionpack (= 5.0.0.beta1) @@ -314,7 +304,6 @@ DEPENDENCIES delayed_job delayed_job_active_record em-hiredis - faye-websocket! jquery-rails json kindlerb (= 0.1.1) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 847fcc71c3..14f968f1ef 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,7 +21,7 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' - s.add_dependency 'faye-websocket', '~> 0.10.0' + s.add_dependency 'nio4r', '~> 1.2' s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_development_dependency 'em-hiredis', '~> 0.3.0' diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index b672e00682..902efb07e2 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -5,12 +5,15 @@ module ActionCable eager_autoload do autoload :Authorization autoload :Base + autoload :ClientSocket autoload :Identification autoload :InternalChannel autoload :MessageBuffer - autoload :WebSocket + autoload :Stream + autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :WebSocket end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..0016d1a1a4 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,14 +49,14 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :stream_event_loop, :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env @logger = new_tagged_logger - @websocket = ActionCable::Connection::WebSocket.new(env) + @websocket = ActionCable::Connection::WebSocket.new(env, self, stream_event_loop) @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) @@ -70,10 +70,6 @@ module ActionCable logger.info started_request_message if websocket.possible? && allow_request_origin? - websocket.on(:open) { |event| send_async :on_open } - websocket.on(:message) { |event| on_message event.data } - websocket.on(:close) { |event| send_async :on_close } - respond_to_successful_request else respond_to_invalid_request @@ -121,6 +117,22 @@ module ActionCable transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i) end + def on_open # :nodoc: + send_async :handle_open + end + + def on_message(message) # :nodoc: + message_buffer.append message + end + + def on_error(message) # :nodoc: + # ignore + end + + def on_close # :nodoc: + send_async :handle_close + end + protected # The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc. def request @@ -139,7 +151,7 @@ module ActionCable attr_reader :message_buffer private - def on_open + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel beat @@ -150,11 +162,7 @@ module ActionCable respond_to_invalid_request end - def on_message(message) - message_buffer.append message - end - - def on_close + def handle_close logger.info finished_request_message server.remove_connection(self) diff --git a/actioncable/lib/action_cable/connection/client_socket.rb b/actioncable/lib/action_cable/connection/client_socket.rb new file mode 100644 index 0000000000..62dd753646 --- /dev/null +++ b/actioncable/lib/action_cable/connection/client_socket.rb @@ -0,0 +1,152 @@ +require 'websocket/driver' + +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class ClientSocket # :nodoc: + def self.determine_url(env) + scheme = secure_request?(env) ? 'wss:' : 'ws:' + "#{ scheme }//#{ env['HTTP_HOST'] }#{ env['REQUEST_URI'] }" + end + + def self.secure_request?(env) + return true if env['HTTPS'] == 'on' + return true if env['HTTP_X_FORWARDED_SSL'] == 'on' + return true if env['HTTP_X_FORWARDED_SCHEME'] == 'https' + return true if env['HTTP_X_FORWARDED_PROTO'] == 'https' + return true if env['rack.url_scheme'] == 'https' + + return false + end + + CONNECTING = 0 + OPEN = 1 + CLOSING = 2 + CLOSED = 3 + + attr_reader :env, :url + + def initialize(env, event_target, stream_event_loop) + @env = env + @event_target = event_target + @stream_event_loop = stream_event_loop + + @url = ClientSocket.determine_url(@env) + + @driver = @driver_started = nil + + @ready_state = CONNECTING + + # The driver calls +env+, +url+, and +write+ + @driver = ::WebSocket::Driver.rack(self) + + @driver.on(:open) { |e| open } + @driver.on(:message) { |e| receive_message(e.data) } + @driver.on(:close) { |e| begin_close(e.reason, e.code) } + @driver.on(:error) { |e| emit_error(e.message) } + + @stream = ActionCable::Connection::Stream.new(@stream_event_loop, self) + + if callback = @env['async.callback'] + callback.call([101, {}, @stream]) + end + end + + def start_driver + return if @driver.nil? || @driver_started + @driver_started = true + @driver.start + end + + def rack_response + start_driver + [ -1, {}, [] ] + end + + def write(data) + @stream.write(data) + end + + def transmit(message) + return false if @ready_state > OPEN + case message + when Numeric then @driver.text(message.to_s) + when String then @driver.text(message) + when Array then @driver.binary(message) + else false + end + end + + def close(code = nil, reason = nil) + code ||= 1000 + reason ||= '' + + unless code == 1000 or (code >= 3000 and code <= 4999) + raise ArgumentError, "Failed to execute 'close' on WebSocket: " + + "The code must be either 1000, or between 3000 and 4999. " + + "#{code} is neither." + end + + @ready_state = CLOSING unless @ready_state == CLOSED + @driver.close(reason, code) + end + + def parse(data) + @driver.parse(data) + end + + def client_gone + finalize_close + end + + def alive? + @ready_state == OPEN + end + + private + def open + return unless @ready_state == CONNECTING + @ready_state = OPEN + + @event_target.on_open + end + + def receive_message(data) + return unless @ready_state == OPEN + + @event_target.on_message(data) + end + + def emit_error(message) + return if @ready_state >= CLOSING + + @event_target.on_error(message) + end + + def begin_close(reason, code) + return if @ready_state == CLOSED + @ready_state = CLOSING + @close_params = [reason, code] + + if @stream + @stream.shutdown + else + finalize_close + end + end + + def finalize_close + return if @ready_state == CLOSED + @ready_state = CLOSED + + reason = @close_params ? @close_params[0] : '' + code = @close_params ? @close_params[1] : 1006 + + @event_target.on_close(code, reason) + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb new file mode 100644 index 0000000000..ace250cd16 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -0,0 +1,59 @@ +module ActionCable + module Connection + #-- + # This class is heavily based on faye-websocket-ruby + # + # Copyright (c) 2010-2015 James Coglan + class Stream + def initialize(event_loop, socket) + @event_loop = event_loop + @socket_object = socket + @stream_send = socket.env['stream.send'] + + @rack_hijack_io = nil + + hijack_rack_socket + end + + def each(&callback) + @stream_send ||= callback + end + + def close + shutdown + @socket_object.client_gone + end + + def shutdown + clean_rack_hijack + end + + def write(data) + return @rack_hijack_io.write(data) if @rack_hijack_io + return @stream_send.call(data) if @stream_send + rescue EOFError + @socket_object.client_gone + end + + def receive(data) + @socket_object.parse(data) + end + + private + def hijack_rack_socket + return unless @socket_object.env['rack.hijack'] + + @socket_object.env['rack.hijack'].call + @rack_hijack_io = @socket_object.env['rack.hijack_io'] + + @event_loop.attach(@rack_hijack_io, self) + end + + def clean_rack_hijack + return unless @rack_hijack_io + @event_loop.detach(@rack_hijack_io, self) + @rack_hijack_io = nil + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/stream_event_loop.rb b/actioncable/lib/action_cable/connection/stream_event_loop.rb new file mode 100644 index 0000000000..f773814973 --- /dev/null +++ b/actioncable/lib/action_cable/connection/stream_event_loop.rb @@ -0,0 +1,68 @@ +require 'nio' + +module ActionCable + module Connection + class StreamEventLoop + def initialize + @nio = NIO::Selector.new + @map = {} + @stopping = false + @todo = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + run + end + end + + def attach(io, stream) + @todo << lambda do + @map[io] = stream + @nio.register(io, :r) + end + @nio.wakeup + end + + def detach(io, stream) + @todo << lambda do + @nio.deregister(io) + @map.delete io + end + @nio.wakeup + end + + def stop + @stopping = true + @nio.wakeup + end + + def run + loop do + if @stopping + @nio.close + break + end + + until @todo.empty? + @todo.pop(true).call + end + + if monitors = @nio.select + monitors.each do |monitor| + io = monitor.io + stream = @map[io] + + begin + stream.receive io.read_nonblock(4096) + rescue IO::WaitReadable + next + rescue EOFError + stream.close + end + end + end + end + end + end + end +end diff --git a/actioncable/lib/action_cable/connection/web_socket.rb b/actioncable/lib/action_cable/connection/web_socket.rb index 670d5690ae..5e89fb9b72 100644 --- a/actioncable/lib/action_cable/connection/web_socket.rb +++ b/actioncable/lib/action_cable/connection/web_socket.rb @@ -1,13 +1,11 @@ -require 'faye/websocket' +require 'websocket/driver' module ActionCable module Connection - # Decorate the Faye::WebSocket with helpers we need. + # Wrap the real socket to minimize the externally-presented API class WebSocket - delegate :rack_response, :close, :on, to: :websocket - - def initialize(env) - @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil + def initialize(env, event_target, stream_event_loop) + @websocket = ::WebSocket::Driver.websocket?(env) ? ClientSocket.new(env, event_target, stream_event_loop) : nil end def possible? @@ -15,11 +13,19 @@ module ActionCable end def alive? - websocket && websocket.ready_state == Faye::WebSocket::API::OPEN + websocket && websocket.alive? end def transmit(data) - websocket.send data + websocket.transmit data + end + + def close + websocket.close + end + + def rack_response + websocket.rack_response end protected diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3385a4c9f3..b00abd208c 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -32,6 +32,10 @@ module ActionCable @remote_connections ||= RemoteConnections.new(self) end + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new + end + # The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size. def worker_pool @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index 579578d0a7..e2b017a9a1 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -55,11 +55,11 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase test "on connection open" do run_in_eventmachine do connection = open_connection - connection.process connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/)) connection.message_buffer.expects(:process!) + connection.process wait_for_async assert_equal [ connection ], @server.connections @@ -74,11 +74,11 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase # Setup the connection Concurrent::TimerTask.stubs(:new).returns(true) - connection.send :on_open + connection.send :handle_open assert connection.connected connection.subscriptions.expects(:unsubscribe_from_all) - connection.send :on_close + connection.send :handle_close assert ! connection.connected assert_equal [], @server.connections diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index a110dfdee0..1019ad541e 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -68,10 +68,10 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/connection/multiple_identifiers_test.rb b/actioncable/test/connection/multiple_identifiers_test.rb index 55a9f96cb3..e9bb4e6d7f 100644 --- a/actioncable/test/connection/multiple_identifiers_test.rb +++ b/actioncable/test/connection/multiple_identifiers_test.rb @@ -32,10 +32,10 @@ class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase @connection = Connection.new(server, env) @connection.process - @connection.send :on_open + @connection.send :handle_open end def close_connection - @connection.send :on_close + @connection.send :handle_close end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 6e6541a952..56d132b30a 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -14,6 +14,7 @@ class TestServer @config.subscription_adapter.new(self) end - def send_async + def stream_event_loop + @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new end end -- cgit v1.2.3 From 503fe757c7f5f917deab95acdcd421a1dede05c7 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Fri, 22 Jan 2016 04:14:03 +1030 Subject: Ditch the EM error logging helper We're no longer doing our work in the EM event loop, so errors are quite unlikely, and if they do occur, they're not really our responsibility to handle. --- actioncable/lib/action_cable/process/logging.rb | 8 -------- actioncable/test/subscription_adapter/common.rb | 1 - railties/lib/rails/generators/rails/app/templates/config.ru.tt | 3 +-- 3 files changed, 1 insertion(+), 11 deletions(-) delete mode 100644 actioncable/lib/action_cable/process/logging.rb diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb deleted file mode 100644 index ebb8990891..0000000000 --- a/actioncable/lib/action_cable/process/logging.rb +++ /dev/null @@ -1,8 +0,0 @@ -require 'action_cable/server' - -if defined?(::EventMachine) - EventMachine.error_handler do |e| - puts "Error raised inside the event loop: #{e.message}" - puts e.backtrace.join("\n") - end -end diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 5efc6d6b81..361858784e 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -1,7 +1,6 @@ require 'test_helper' require 'concurrent' -require 'action_cable/process/logging' require 'active_support/core_ext/hash/indifferent_access' require 'pathname' diff --git a/railties/lib/rails/generators/rails/app/templates/config.ru.tt b/railties/lib/rails/generators/rails/app/templates/config.ru.tt index 70556fcc99..343c0833d7 100644 --- a/railties/lib/rails/generators/rails/app/templates/config.ru.tt +++ b/railties/lib/rails/generators/rails/app/templates/config.ru.tt @@ -3,9 +3,8 @@ require ::File.expand_path('../config/environment', __FILE__) <%- unless options[:skip_action_cable] -%> -# Action Cable uses EventMachine which requires that all classes are loaded in advance +# Action Cable requires that all classes are loaded in advance Rails.application.eager_load! -require 'action_cable/process/logging' <%- end -%> run Rails.application -- cgit v1.2.3