aboutsummaryrefslogblamecommitdiffstats
path: root/actioncable/test/client_test.rb
blob: b185654c7124c62c52768c530477a9d7856570bf (plain) (tree)













































































































































































































































                                                                                                                                           
require 'test_helper'
require 'concurrent'

require 'active_support/core_ext/hash/indifferent_access'
require 'pathname'

require 'faye/websocket'
require 'json'

class ClientTest < ActionCable::TestCase
  WAIT_WHEN_EXPECTING_EVENT = 3
  WAIT_WHEN_NOT_EXPECTING_EVENT = 0.2

  def setup
    # TODO: ActionCable requires a *lot* of setup at the moment...
    ::Object.const_set(:ApplicationCable, Module.new)
    ::ApplicationCable.const_set(:Connection, Class.new(ActionCable::Connection::Base))

    ::Object.const_set(:Rails, Module.new)
    ::Rails.singleton_class.send(:define_method, :root) { Pathname.new(__dir__) }

    ActionCable.instance_variable_set(:@server, nil)
    server = ActionCable.server
    server.config = ActionCable::Server::Configuration.new
    inner_logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
    server.config.logger = ActionCable::Connection::TaggedLoggerProxy.new(inner_logger, tags: [])

    server.config.cable = { adapter: 'async' }.with_indifferent_access

    # and now the "real" setup for our test:
    server.config.disable_request_forgery_protection = true
    server.config.channel_load_paths = [File.expand_path('client', __dir__)]

    @reactor_mutex = Mutex.new
    @reactor_users = 0
    @reactor_running = Concurrent::Event.new

    # faye-websocket is warning-rich
    @previous_verbose, $VERBOSE = $VERBOSE, nil
  end

  def teardown
    $VERBOSE = @previous_verbose

    if @reactor_running.set?
      EM.stop
    end

    begin
      ::Object.send(:remove_const, :ApplicationCable)
    rescue NameError
    end
    begin
      ::Object.send(:remove_const, :Rails)
    rescue NameError
    end
  end

  def with_puma_server(rack_app = ActionCable.server, port = 3099)
    server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
    server.add_tcp_listener '127.0.0.1', port
    server.min_threads = 1
    server.max_threads = 4

    t = Thread.new { server.run.join }
    yield port

  ensure
    server.stop(true)
    t.join
  end

  def start_event_machine
    @reactor_mutex.synchronize do
      unless @reactor_running.set?
        @reactor ||= Thread.new do
          EM.next_tick do
            @reactor_running.set
          end
          EM.run
        end
        @reactor_running.wait(WAIT_WHEN_EXPECTING_EVENT)
      end
      @reactor_users += 1
    end
  end

  def stop_event_machine
    @reactor_mutex.synchronize do
      @reactor_users -= 1
      if @reactor_users < 1
        @reactor_running.reset
        EM.stop
        @reactor = nil
      end
    end
  end

  class SyncClient
    attr_reader :pings

    def initialize(em_controller, port)
      em_controller.start_event_machine

      @ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
      @messages = Queue.new
      @closed = Concurrent::Event.new
      @has_messages = Concurrent::Event.new
      @pings = 0

      open = Concurrent::Event.new
      error = nil

      @ws.on(:error) do |event|
        if open.set?
          @messages << RuntimeError.new(event.message)
        else
          error = event.message
          open.set
        end
      end

      @ws.on(:open) do |event|
        open.set
      end

      @ws.on(:message) do |event|
        hash = JSON.parse(event.data)
        if hash['identifier'] == '_ping'
          @pings += 1
        else
          @messages << hash
          @has_messages.set
        end
      end

      @ws.on(:close) do |event|
        @ws = nil
        em_controller.stop_event_machine
        @closed.set
      end

      open.wait(WAIT_WHEN_EXPECTING_EVENT)
      raise error if error
    end

    def read_message
      @has_messages.wait(WAIT_WHEN_EXPECTING_EVENT) if @messages.empty?
      @has_messages.reset if @messages.size < 2

      msg = @messages.pop(true)
      raise msg if msg.is_a?(Exception)

      msg
    end

    def read_messages
      list = []
      loop do
        @has_messages.wait(WAIT_WHEN_NOT_EXPECTING_EVENT)
        if @has_messages.set?
          list << read_message
        else
          break
        end
      end
      list
    end

    def send_message(hash)
      @ws.send(JSON.dump(hash))
    end

    def close
      sleep WAIT_WHEN_NOT_EXPECTING_EVENT

      unless @messages.empty?
        raise "#{@messages.size} messages unprocessed"
      end

      @ws.close
      @closed.wait(WAIT_WHEN_EXPECTING_EVENT)
    end
  end

  def faye_client(port)
    SyncClient.new(self, port)
  end

  def test_single_client
    with_puma_server do |port|
      c = faye_client(port)
      c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
      assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
      c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
      assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
      c.close
    end
  end

  def test_interacting_clients
    with_puma_server do |port|
      clients = 20.times.map { faye_client(port) }

      barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
      barrier_2 = Concurrent::CyclicBarrier.new(clients.size)

      clients.map {|c| Concurrent::Future.execute {
        c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
        assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
        c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
        assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
        barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
        c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
        barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
        sleep 1
        assert_equal clients.size, c.read_messages.size
      } }.each(&:wait!)

      clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
    end
  end

  def test_many_clients
    with_puma_server do |port|
      clients = 200.times.map { faye_client(port) }

      clients.map {|c| Concurrent::Future.execute {
        c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
        assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
        c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
        assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
      } }.each(&:wait!)

      clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
    end
  end
end