diff options
Diffstat (limited to 'actioncable/lib')
28 files changed, 880 insertions, 78 deletions
diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index e7456e3c1b..ad5fd43155 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true #-- -# Copyright (c) 2015-2018 Basecamp, LLC +# Copyright (c) 2015-2019 Basecamp, LLC # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the @@ -32,13 +32,19 @@ module ActionCable INTERNAL = { message_types: { - welcome: "welcome".freeze, - ping: "ping".freeze, - confirmation: "confirm_subscription".freeze, - rejection: "reject_subscription".freeze + welcome: "welcome", + disconnect: "disconnect", + ping: "ping", + confirmation: "confirm_subscription", + rejection: "reject_subscription" }, - default_mount_path: "/cable".freeze, - protocols: ["actioncable-v1-json".freeze, "actioncable-unsupported".freeze].freeze + disconnect_reasons: { + unauthorized: "unauthorized", + invalid_request: "invalid_request", + server_restart: "server_restart" + }, + default_mount_path: "/cable", + protocols: ["actioncable-v1-json", "actioncable-unsupported"].freeze } # Singleton instance of the server @@ -51,4 +57,6 @@ module ActionCable autoload :Channel autoload :RemoteConnections autoload :SubscriptionAdapter + autoload :TestHelper + autoload :TestCase end diff --git a/actioncable/lib/action_cable/channel.rb b/actioncable/lib/action_cable/channel.rb index d2f6fbbbc7..d5118b9dc9 100644 --- a/actioncable/lib/action_cable/channel.rb +++ b/actioncable/lib/action_cable/channel.rb @@ -11,6 +11,7 @@ module ActionCable autoload :Naming autoload :PeriodicTimers autoload :Streams + autoload :TestCase end end end diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index c5ad749bfe..af061c843a 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "set" +require "active_support/rescuable" module ActionCable module Channel @@ -99,6 +100,7 @@ module ActionCable include Streams include Naming include Broadcasting + include ActiveSupport::Rescuable attr_reader :params, :connection, :identifier delegate :logger, to: :connection @@ -267,10 +269,12 @@ module ActionCable else public_send action end + rescue Exception => exception + rescue_with_handler(exception) || raise end def action_signature(action, data) - "#{self.class.name}##{action}".dup.tap do |signature| + (+"#{self.class.name}##{action}").tap do |signature| if (arguments = data.except("action")).any? signature << "(#{arguments.inspect})" end @@ -303,3 +307,5 @@ module ActionCable end end end + +ActiveSupport.run_load_hooks(:action_cable_channel, ActionCable::Channel::Base) diff --git a/actioncable/lib/action_cable/channel/broadcasting.rb b/actioncable/lib/action_cable/channel/broadcasting.rb index 9a96720f4a..9f702e425e 100644 --- a/actioncable/lib/action_cable/channel/broadcasting.rb +++ b/actioncable/lib/action_cable/channel/broadcasting.rb @@ -7,22 +7,32 @@ module ActionCable module Broadcasting extend ActiveSupport::Concern - delegate :broadcasting_for, to: :class + delegate :broadcasting_for, :broadcast_to, to: :class module ClassMethods # Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel. def broadcast_to(model, message) - ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message) + ActionCable.server.broadcast(broadcasting_for(model), message) end - def broadcasting_for(model) #:nodoc: + # Returns a unique broadcasting identifier for this <tt>model</tt> in this channel: + # + # CommentsChannel.broadcasting_for("all") # => "comments:all" + # + # You can pass any object as a target (e.g. Active Record model), and it + # would be serialized into a string under the hood. + def broadcasting_for(model) + serialize_broadcasting([ channel_name, model ]) + end + + def serialize_broadcasting(object) #:nodoc: case - when model.is_a?(Array) - model.map { |m| broadcasting_for(m) }.join(":") - when model.respond_to?(:to_gid_param) - model.to_gid_param + when object.is_a?(Array) + object.map { |m| serialize_broadcasting(m) }.join(":") + when object.respond_to?(:to_gid_param) + object.to_gid_param else - model.to_param + object.to_param end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 81c2c38064..7e1ed3c850 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -99,7 +99,7 @@ module ActionCable # Pass <tt>coder: ActiveSupport::JSON</tt> to decode messages as JSON before passing to the callback. # Defaults to <tt>coder: nil</tt> which does no decoding, passes raw messages. def stream_for(model, callback = nil, coder: nil, &block) - stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) + stream_from(broadcasting_for(model), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. diff --git a/actioncable/lib/action_cable/channel/test_case.rb b/actioncable/lib/action_cable/channel/test_case.rb new file mode 100644 index 0000000000..b05d51a61a --- /dev/null +++ b/actioncable/lib/action_cable/channel/test_case.rb @@ -0,0 +1,310 @@ +# frozen_string_literal: true + +require "active_support" +require "active_support/test_case" +require "active_support/core_ext/hash/indifferent_access" +require "json" + +module ActionCable + module Channel + class NonInferrableChannelError < ::StandardError + def initialize(name) + super "Unable to determine the channel to test from #{name}. " + + "You'll need to specify it using `tests YourChannel` in your " + + "test case definition." + end + end + + # Stub `stream_from` to track streams for the channel. + # Add public aliases for `subscription_confirmation_sent?` and + # `subscription_rejected?`. + module ChannelStub + def confirmed? + subscription_confirmation_sent? + end + + def rejected? + subscription_rejected? + end + + def stream_from(broadcasting, *) + streams << broadcasting + end + + def stop_all_streams + @_streams = [] + end + + def streams + @_streams ||= [] + end + + # Make periodic timers no-op + def start_periodic_timers; end + alias stop_periodic_timers start_periodic_timers + end + + class ConnectionStub + attr_reader :transmissions, :identifiers, :subscriptions, :logger + + def initialize(identifiers = {}) + @transmissions = [] + + identifiers.each do |identifier, val| + define_singleton_method(identifier) { val } + end + + @subscriptions = ActionCable::Connection::Subscriptions.new(self) + @identifiers = identifiers.keys + @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) + end + + def transmit(cable_message) + transmissions << cable_message.with_indifferent_access + end + end + + # Superclass for Action Cable channel functional tests. + # + # == Basic example + # + # Functional tests are written as follows: + # 1. First, one uses the +subscribe+ method to simulate subscription creation. + # 2. Then, one asserts whether the current state is as expected. "State" can be anything: + # transmitted messages, subscribed streams, etc. + # + # For example: + # + # class ChatChannelTest < ActionCable::Channel::TestCase + # def test_subscribed_with_room_number + # # Simulate a subscription creation + # subscribe room_number: 1 + # + # # Asserts that the subscription was successfully created + # assert subscription.confirmed? + # + # # Asserts that the channel subscribes connection to a stream + # assert_has_stream "chat_1" + # + # # Asserts that the channel subscribes connection to a specific + # # stream created for a model + # assert_has_stream_for Room.find(1) + # end + # + # def test_does_not_stream_with_incorrect_room_number + # subscribe room_number: -1 + # + # # Asserts that not streams was started + # assert_no_streams + # end + # + # def test_does_not_subscribe_without_room_number + # subscribe + # + # # Asserts that the subscription was rejected + # assert subscription.rejected? + # end + # end + # + # You can also perform actions: + # def test_perform_speak + # subscribe room_number: 1 + # + # perform :speak, message: "Hello, Rails!" + # + # assert_equal "Hello, Rails!", transmissions.last["text"] + # end + # + # == Special methods + # + # ActionCable::Channel::TestCase will also automatically provide the following instance + # methods for use in the tests: + # + # <b>connection</b>:: + # An ActionCable::Channel::ConnectionStub, representing the current HTTP connection. + # <b>subscription</b>:: + # An instance of the current channel, created when you call `subscribe`. + # <b>transmissions</b>:: + # A list of all messages that have been transmitted into the channel. + # + # + # == Channel is automatically inferred + # + # ActionCable::Channel::TestCase will automatically infer the channel under test + # from the test class name. If the channel cannot be inferred from the test + # class name, you can explicitly set it with +tests+. + # + # class SpecialEdgeCaseChannelTest < ActionCable::Channel::TestCase + # tests SpecialChannel + # end + # + # == Specifying connection identifiers + # + # You need to set up your connection manually to provide values for the identifiers. + # To do this just use: + # + # stub_connection(user: users(:john)) + # + # == Testing broadcasting + # + # ActionCable::Channel::TestCase enhances ActionCable::TestHelper assertions (e.g. + # +assert_broadcasts+) to handle broadcasting to models: + # + # + # # in your channel + # def speak(data) + # broadcast_to room, text: data["message"] + # end + # + # def test_speak + # subscribe room_id: rooms(:chat).id + # + # assert_broadcasts_on(rooms(:chat), text: "Hello, Rails!") do + # perform :speak, message: "Hello, Rails!" + # end + # end + class TestCase < ActiveSupport::TestCase + module Behavior + extend ActiveSupport::Concern + + include ActiveSupport::Testing::ConstantLookup + include ActionCable::TestHelper + + CHANNEL_IDENTIFIER = "test_stub" + + included do + class_attribute :_channel_class + + attr_reader :connection, :subscription + + ActiveSupport.run_load_hooks(:action_cable_channel_test_case, self) + end + + module ClassMethods + def tests(channel) + case channel + when String, Symbol + self._channel_class = channel.to_s.camelize.constantize + when Module + self._channel_class = channel + else + raise NonInferrableChannelError.new(channel) + end + end + + def channel_class + if channel = self._channel_class + channel + else + tests determine_default_channel(name) + end + end + + def determine_default_channel(name) + channel = determine_constant_from_test_name(name) do |constant| + Class === constant && constant < ActionCable::Channel::Base + end + raise NonInferrableChannelError.new(name) if channel.nil? + channel + end + end + + # Setup test connection with the specified identifiers: + # + # class ApplicationCable < ActionCable::Connection::Base + # identified_by :user, :token + # end + # + # stub_connection(user: users[:john], token: 'my-secret-token') + def stub_connection(identifiers = {}) + @connection = ConnectionStub.new(identifiers) + end + + # Subscribe to the channel under test. Optionally pass subscription parameters as a Hash. + def subscribe(params = {}) + @connection ||= stub_connection + @subscription = self.class.channel_class.new(connection, CHANNEL_IDENTIFIER, params.with_indifferent_access) + @subscription.singleton_class.include(ChannelStub) + @subscription.subscribe_to_channel + @subscription + end + + # Unsubscribe the subscription under test. + def unsubscribe + check_subscribed! + subscription.unsubscribe_from_channel + end + + # Perform action on a channel. + # + # NOTE: Must be subscribed. + def perform(action, data = {}) + check_subscribed! + subscription.perform_action(data.stringify_keys.merge("action" => action.to_s)) + end + + # Returns messages transmitted into channel + def transmissions + # Return only directly sent message (via #transmit) + connection.transmissions.map { |data| data["message"] }.compact + end + + # Enhance TestHelper assertions to handle non-String + # broadcastings + def assert_broadcasts(stream_or_object, *args) + super(broadcasting_for(stream_or_object), *args) + end + + def assert_broadcast_on(stream_or_object, *args) + super(broadcasting_for(stream_or_object), *args) + end + + # Asserts that no streams have been started. + # + # def test_assert_no_started_stream + # subscribe + # assert_no_streams + # end + # + def assert_no_streams + assert subscription.streams.empty?, "No streams started was expected, but #{subscription.streams.count} found" + end + + # Asserts that the specified stream has been started. + # + # def test_assert_started_stream + # subscribe + # assert_has_stream 'messages' + # end + # + def assert_has_stream(stream) + assert subscription.streams.include?(stream), "Stream #{stream} has not been started" + end + + # Asserts that the specified stream for a model has started. + # + # def test_assert_started_stream_for + # subscribe id: 42 + # assert_has_stream_for User.find(42) + # end + # + def assert_has_stream_for(object) + assert_has_stream(broadcasting_for(object)) + end + + private + def check_subscribed! + raise "Must be subscribed!" if subscription.nil? || subscription.rejected? + end + + def broadcasting_for(stream_or_object) + return stream_or_object if stream_or_object.is_a?(String) + + self.class.channel_class.broadcasting_for(stream_or_object) + end + end + + include Behavior + end + end +end diff --git a/actioncable/lib/action_cable/connection.rb b/actioncable/lib/action_cable/connection.rb index 804b89a707..20b5dbe78d 100644 --- a/actioncable/lib/action_cable/connection.rb +++ b/actioncable/lib/action_cable/connection.rb @@ -15,6 +15,7 @@ module ActionCable autoload :StreamEventLoop autoload :Subscriptions autoload :TaggedLoggerProxy + autoload :TestCase autoload :WebSocket end end diff --git a/actioncable/lib/action_cable/connection/authorization.rb b/actioncable/lib/action_cable/connection/authorization.rb index a22179d988..aef3386f71 100644 --- a/actioncable/lib/action_cable/connection/authorization.rb +++ b/actioncable/lib/action_cable/connection/authorization.rb @@ -5,7 +5,7 @@ module ActionCable module Authorization class UnauthorizedError < StandardError; end - # Closes the \WebSocket connection if it is open and returns a 404 "File not Found" response. + # Closes the WebSocket connection if it is open and returns a 404 "File not Found" response. def reject_unauthorized_connection logger.error "An unauthorized connection attempt was rejected" raise UnauthorizedError diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 11a1f1a5e8..c469f7066c 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -95,7 +95,12 @@ module ActionCable end # Close the WebSocket connection. - def close + def close(reason: nil, reconnect: true) + transmit( + type: ActionCable::INTERNAL[:message_types][:disconnect], + reason: reason, + reconnect: reconnect + ) websocket.close end @@ -170,7 +175,7 @@ module ActionCable message_buffer.process! server.add_connection(self) rescue ActionCable::Connection::Authorization::UnauthorizedError - respond_to_invalid_request + close(reason: ActionCable::INTERNAL[:disconnect_reasons][:unauthorized], reconnect: false) if websocket.alive? end def handle_close @@ -211,7 +216,7 @@ module ActionCable end def respond_to_invalid_request - close if websocket.alive? + close(reason: ActionCable::INTERNAL[:disconnect_reasons][:invalid_request]) if websocket.alive? logger.error invalid_request_message logger.info finished_request_message @@ -255,3 +260,5 @@ module ActionCable end end end + +ActiveSupport.run_load_hooks(:action_cable_connection, ActionCable::Connection::Base) diff --git a/actioncable/lib/action_cable/connection/stream.rb b/actioncable/lib/action_cable/connection/stream.rb index 4873026b71..e658948a55 100644 --- a/actioncable/lib/action_cable/connection/stream.rb +++ b/actioncable/lib/action_cable/connection/stream.rb @@ -98,8 +98,10 @@ module ActionCable def hijack_rack_socket return unless @socket_object.env["rack.hijack"] - @socket_object.env["rack.hijack"].call - @rack_hijack_io = @socket_object.env["rack.hijack_io"] + # This should return the underlying io according to the SPEC: + @rack_hijack_io = @socket_object.env["rack.hijack"].call + # Retain existing behaviour if required: + @rack_hijack_io ||= @socket_object.env["rack.hijack_io"] @event_loop.attach(@rack_hijack_io, self) end diff --git a/actioncable/lib/action_cable/connection/test_case.rb b/actioncable/lib/action_cable/connection/test_case.rb new file mode 100644 index 0000000000..f1673fea08 --- /dev/null +++ b/actioncable/lib/action_cable/connection/test_case.rb @@ -0,0 +1,234 @@ +# frozen_string_literal: true + +require "active_support" +require "active_support/test_case" +require "active_support/core_ext/hash/indifferent_access" +require "action_dispatch" +require "action_dispatch/http/headers" +require "action_dispatch/testing/test_request" + +module ActionCable + module Connection + class NonInferrableConnectionError < ::StandardError + def initialize(name) + super "Unable to determine the connection to test from #{name}. " + + "You'll need to specify it using `tests YourConnection` in your " + + "test case definition." + end + end + + module Assertions + # Asserts that the connection is rejected (via +reject_unauthorized_connection+). + # + # # Asserts that connection without user_id fails + # assert_reject_connection { connect params: { user_id: '' } } + def assert_reject_connection(&block) + assert_raises(Authorization::UnauthorizedError, "Expected to reject connection but no rejection was made", &block) + end + end + + # We don't want to use the whole "encryption stack" for connection + # unit-tests, but we want to make sure that users test against the correct types + # of cookies (i.e. signed or encrypted or plain) + class TestCookieJar < ActiveSupport::HashWithIndifferentAccess + def signed + self[:signed] ||= {}.with_indifferent_access + end + + def encrypted + self[:encrypted] ||= {}.with_indifferent_access + end + end + + class TestRequest < ActionDispatch::TestRequest + attr_accessor :session, :cookie_jar + end + + module TestConnection + attr_reader :logger, :request + + def initialize(request) + inner_logger = ActiveSupport::Logger.new(StringIO.new) + tagged_logging = ActiveSupport::TaggedLogging.new(inner_logger) + @logger = ActionCable::Connection::TaggedLoggerProxy.new(tagged_logging, tags: []) + @request = request + @env = request.env + end + end + + # Unit test Action Cable connections. + # + # Useful to check whether a connection's +identified_by+ gets assigned properly + # and that any improper connection requests are rejected. + # + # == Basic example + # + # Unit tests are written as follows: + # + # 1. Simulate a connection attempt by calling +connect+. + # 2. Assert state, e.g. identifiers, has been assigned. + # + # + # class ApplicationCable::ConnectionTest < ActionCable::Connection::TestCase + # def test_connects_with_proper_cookie + # # Simulate the connection request with a cookie. + # cookies["user_id"] = users(:john).id + # + # connect + # + # # Assert the connection identifier matches the fixture. + # assert_equal users(:john).id, connection.user.id + # end + # + # def test_rejects_connection_without_proper_cookie + # assert_reject_connection { connect } + # end + # end + # + # +connect+ accepts additional information the HTTP request with the + # +params+, +headers+, +session+ and Rack +env+ options. + # + # def test_connect_with_headers_and_query_string + # connect params: { user_id: 1 }, headers: { "X-API-TOKEN" => "secret-my" } + # + # assert_equal "1", connection.user.id + # assert_equal "secret-my", connection.token + # end + # + # def test_connect_with_params + # connect params: { user_id: 1 } + # + # assert_equal "1", connection.user.id + # end + # + # You can also setup the correct cookies before the connection request: + # + # def test_connect_with_cookies + # # Plain cookies: + # cookies["user_id"] = 1 + # + # # Or signed/encrypted: + # # cookies.signed["user_id"] = 1 + # # cookies.encrypted["user_id"] = 1 + # + # connect + # + # assert_equal "1", connection.user_id + # end + # + # == Connection is automatically inferred + # + # ActionCable::Connection::TestCase will automatically infer the connection under test + # from the test class name. If the channel cannot be inferred from the test + # class name, you can explicitly set it with +tests+. + # + # class ConnectionTest < ActionCable::Connection::TestCase + # tests ApplicationCable::Connection + # end + # + class TestCase < ActiveSupport::TestCase + module Behavior + extend ActiveSupport::Concern + + DEFAULT_PATH = "/cable" + + include ActiveSupport::Testing::ConstantLookup + include Assertions + + included do + class_attribute :_connection_class + + attr_reader :connection + + ActiveSupport.run_load_hooks(:action_cable_connection_test_case, self) + end + + module ClassMethods + def tests(connection) + case connection + when String, Symbol + self._connection_class = connection.to_s.camelize.constantize + when Module + self._connection_class = connection + else + raise NonInferrableConnectionError.new(connection) + end + end + + def connection_class + if connection = self._connection_class + connection + else + tests determine_default_connection(name) + end + end + + def determine_default_connection(name) + connection = determine_constant_from_test_name(name) do |constant| + Class === constant && constant < ActionCable::Connection::Base + end + raise NonInferrableConnectionError.new(name) if connection.nil? + connection + end + end + + # Performs connection attempt to exert #connect on the connection under test. + # + # Accepts request path as the first argument and the following request options: + # + # - params – URL parameters (Hash) + # - headers – request headers (Hash) + # - session – session data (Hash) + # - env – additional Rack env configuration (Hash) + def connect(path = ActionCable.server.config.mount_path, **request_params) + path ||= DEFAULT_PATH + + connection = self.class.connection_class.allocate + connection.singleton_class.include(TestConnection) + connection.send(:initialize, build_test_request(path, request_params)) + connection.connect if connection.respond_to?(:connect) + + # Only set instance variable if connected successfully + @connection = connection + end + + # Exert #disconnect on the connection under test. + def disconnect + raise "Must be connected!" if connection.nil? + + connection.disconnect if connection.respond_to?(:disconnect) + @connection = nil + end + + def cookies + @cookie_jar ||= TestCookieJar.new + end + + private + def build_test_request(path, params: nil, headers: {}, session: {}, env: {}) + wrapped_headers = ActionDispatch::Http::Headers.from_hash(headers) + + uri = URI.parse(path) + + query_string = params.nil? ? uri.query : params.to_query + + request_env = { + "QUERY_STRING" => query_string, + "PATH_INFO" => uri.path + }.merge(env) + + if wrapped_headers.present? + ActionDispatch::Http::Headers.from_hash(request_env).merge!(wrapped_headers) + end + + TestRequest.create(request_env).tap do |request| + request.session = session.with_indifferent_access + request.cookie_jar = cookies + end + end + end + + include Behavior + end + end +end diff --git a/actioncable/lib/action_cable/gem_version.rb b/actioncable/lib/action_cable/gem_version.rb index cd1d9bccef..2be81736c6 100644 --- a/actioncable/lib/action_cable/gem_version.rb +++ b/actioncable/lib/action_cable/gem_version.rb @@ -10,7 +10,7 @@ module ActionCable MAJOR = 6 MINOR = 0 TINY = 0 - PRE = "alpha" + PRE = "beta3" STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".") end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 1ee03f6dfc..98b3743175 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -12,14 +12,17 @@ module ActionCable include ActionCable::Server::Broadcasting include ActionCable::Server::Connections - cattr_accessor :config, instance_accessor: true, default: ActionCable::Server::Configuration.new + cattr_accessor :config, instance_accessor: false, default: ActionCable::Server::Configuration.new + + attr_reader :config def self.logger; config.logger; end delegate :logger, to: :config attr_reader :mutex - def initialize + def initialize(config: self.class.config) + @config = config @mutex = Monitor.new @remote_connections = @event_loop = @worker_pool = @pubsub = nil end @@ -36,7 +39,9 @@ module ActionCable end def restart - connections.each(&:close) + connections.each do |connection| + connection.close(reason: ActionCable::INTERNAL[:disconnect_reasons][:server_restart]) + end @mutex.synchronize do # Shutdown the worker pool diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index c69cc4ac31..187c8f7939 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -56,14 +56,12 @@ module ActionCable def invoke(receiver, method, *args, connection:, &block) work(connection) do - begin - receiver.send method, *args, &block - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + receiver.send method, *args, &block + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") - receiver.handle_exception if receiver.respond_to?(:handle_exception) - end + receiver.handle_exception if receiver.respond_to?(:handle_exception) end end diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb index bcece8d33b..6a9d5c2080 100644 --- a/actioncable/lib/action_cable/subscription_adapter.rb +++ b/actioncable/lib/action_cable/subscription_adapter.rb @@ -5,6 +5,7 @@ module ActionCable extend ActiveSupport::Autoload autoload :Base + autoload :Test autoload :SubscriberMap autoload :ChannelPrefix end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index e384ea4afb..1d60bed4af 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -8,13 +8,15 @@ require "digest/sha1" module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + prepend ChannelPrefix + def initialize(*) super @listener = nil end def broadcast(channel, payload) - with_connection do |pg_conn| + with_broadcast_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel_identifier(channel))}, '#{pg_conn.escape_string(payload)}'") end end @@ -31,14 +33,24 @@ module ActionCable listener.shutdown end - def with_connection(&block) # :nodoc: - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection + def with_subscriptions_connection(&block) # :nodoc: + ar_conn = ActiveRecord::Base.connection_pool.checkout.tap do |conn| + # Action Cable is taking ownership over this database connection, and + # will perform the necessary cleanup tasks + ActiveRecord::Base.connection_pool.remove(conn) + end + pg_conn = ar_conn.raw_connection - unless pg_conn.is_a?(PG::Connection) - raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter" - end + verify!(pg_conn) + yield pg_conn + ensure + ar_conn.disconnect! + end + def with_broadcast_connection(&block) # :nodoc: + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + verify!(pg_conn) yield pg_conn end end @@ -52,6 +64,12 @@ module ActionCable @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end + def verify!(pg_conn) + unless pg_conn.is_a?(PG::Connection) + raise "The Active Record database must be PostgreSQL in order to use the PostgreSQL Action Cable storage adapter" + end + end + class Listener < SubscriberMap def initialize(adapter, event_loop) super() @@ -67,7 +85,7 @@ module ActionCable end def listen - @adapter.with_connection do |pg_conn| + @adapter.with_subscriptions_connection do |pg_conn| catch :shutdown do loop do until @queue.empty? diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index c28951608f..ad8fa52760 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -13,7 +13,8 @@ module ActionCable # Overwrite this factory method for Redis connections if you want to use a different Redis library than the redis gem. # This is needed, for example, when using Makara proxies for distributed Redis. cattr_accessor :redis_connector, default: ->(config) do - ::Redis.new(config.slice(:url, :host, :port, :db, :password)) + config[:id] ||= "ActionCable-PID-#{$$}" + ::Redis.new(config.slice(:url, :host, :port, :db, :password, :id)) end def initialize(*) diff --git a/actioncable/lib/action_cable/subscription_adapter/test.rb b/actioncable/lib/action_cable/subscription_adapter/test.rb new file mode 100644 index 0000000000..ce604cc88e --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/test.rb @@ -0,0 +1,40 @@ +# frozen_string_literal: true + +require_relative "async" + +module ActionCable + module SubscriptionAdapter + # == Test adapter for Action Cable + # + # The test adapter should be used only in testing. Along with + # <tt>ActionCable::TestHelper</tt> it makes a great tool to test your Rails application. + # + # To use the test adapter set +adapter+ value to +test+ in your +config/cable.yml+ file. + # + # NOTE: Test adapter extends the <tt>ActionCable::SubscriptionsAdapter::Async</tt> adapter, + # so it could be used in system tests too. + class Test < Async + def broadcast(channel, payload) + broadcasts(channel) << payload + super + end + + def broadcasts(channel) + channels_data[channel] ||= [] + end + + def clear_messages(channel) + channels_data[channel] = [] + end + + def clear + @channels_data = nil + end + + private + def channels_data + @channels_data ||= {} + end + end + end +end diff --git a/actioncable/lib/action_cable/test_case.rb b/actioncable/lib/action_cable/test_case.rb new file mode 100644 index 0000000000..d153259bf6 --- /dev/null +++ b/actioncable/lib/action_cable/test_case.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require "active_support/test_case" + +module ActionCable + class TestCase < ActiveSupport::TestCase + include ActionCable::TestHelper + + ActiveSupport.run_load_hooks(:action_cable_test_case, self) + end +end diff --git a/actioncable/lib/action_cable/test_helper.rb b/actioncable/lib/action_cable/test_helper.rb new file mode 100644 index 0000000000..26b849a273 --- /dev/null +++ b/actioncable/lib/action_cable/test_helper.rb @@ -0,0 +1,133 @@ +# frozen_string_literal: true + +module ActionCable + # Provides helper methods for testing Action Cable broadcasting + module TestHelper + def before_setup # :nodoc: + server = ActionCable.server + test_adapter = ActionCable::SubscriptionAdapter::Test.new(server) + + @old_pubsub_adapter = server.pubsub + + server.instance_variable_set(:@pubsub, test_adapter) + super + end + + def after_teardown # :nodoc: + super + ActionCable.server.instance_variable_set(:@pubsub, @old_pubsub_adapter) + end + + # Asserts that the number of broadcasted messages to the stream matches the given number. + # + # def test_broadcasts + # assert_broadcasts 'messages', 0 + # ActionCable.server.broadcast 'messages', { text: 'hello' } + # assert_broadcasts 'messages', 1 + # ActionCable.server.broadcast 'messages', { text: 'world' } + # assert_broadcasts 'messages', 2 + # end + # + # If a block is passed, that block should cause the specified number of + # messages to be broadcasted. + # + # def test_broadcasts_again + # assert_broadcasts('messages', 1) do + # ActionCable.server.broadcast 'messages', { text: 'hello' } + # end + # + # assert_broadcasts('messages', 2) do + # ActionCable.server.broadcast 'messages', { text: 'hi' } + # ActionCable.server.broadcast 'messages', { text: 'how are you?' } + # end + # end + # + def assert_broadcasts(stream, number) + if block_given? + original_count = broadcasts_size(stream) + yield + new_count = broadcasts_size(stream) + actual_count = new_count - original_count + else + actual_count = broadcasts_size(stream) + end + + assert_equal number, actual_count, "#{number} broadcasts to #{stream} expected, but #{actual_count} were sent" + end + + # Asserts that no messages have been sent to the stream. + # + # def test_no_broadcasts + # assert_no_broadcasts 'messages' + # ActionCable.server.broadcast 'messages', { text: 'hi' } + # assert_broadcasts 'messages', 1 + # end + # + # If a block is passed, that block should not cause any message to be sent. + # + # def test_broadcasts_again + # assert_no_broadcasts 'messages' do + # # No job messages should be sent from this block + # end + # end + # + # Note: This assertion is simply a shortcut for: + # + # assert_broadcasts 'messages', 0, &block + # + def assert_no_broadcasts(stream, &block) + assert_broadcasts stream, 0, &block + end + + # Asserts that the specified message has been sent to the stream. + # + # def test_assert_transmitted_message + # ActionCable.server.broadcast 'messages', text: 'hello' + # assert_broadcast_on('messages', text: 'hello') + # end + # + # If a block is passed, that block should cause a message with the specified data to be sent. + # + # def test_assert_broadcast_on_again + # assert_broadcast_on('messages', text: 'hello') do + # ActionCable.server.broadcast 'messages', text: 'hello' + # end + # end + # + def assert_broadcast_on(stream, data) + # Encode to JSON and back–we want to use this value to compare + # with decoded JSON. + # Comparing JSON strings doesn't work due to the order if the keys. + serialized_msg = + ActiveSupport::JSON.decode(ActiveSupport::JSON.encode(data)) + + new_messages = broadcasts(stream) + if block_given? + old_messages = new_messages + clear_messages(stream) + + yield + new_messages = broadcasts(stream) + clear_messages(stream) + + # Restore all sent messages + (old_messages + new_messages).each { |m| pubsub_adapter.broadcast(stream, m) } + end + + message = new_messages.find { |msg| ActiveSupport::JSON.decode(msg) == serialized_msg } + + assert message, "No messages sent with #{data} to #{stream}" + end + + def pubsub_adapter # :nodoc: + ActionCable.server.pubsub + end + + delegate :broadcasts, :clear_messages, to: :pubsub_adapter + + private + def broadcasts_size(channel) + broadcasts(channel).size + end + end +end diff --git a/actioncable/lib/rails/generators/channel/USAGE b/actioncable/lib/rails/generators/channel/USAGE index dd109fda80..bb5dd7e2db 100644 --- a/actioncable/lib/rails/generators/channel/USAGE +++ b/actioncable/lib/rails/generators/channel/USAGE @@ -1,14 +1,13 @@ Description: ============ - Stubs out a new cable channel for the server (in Ruby) and client (in CoffeeScript). + Stubs out a new cable channel for the server (in Ruby) and client (in JavaScript). Pass the channel name, either CamelCased or under_scored, and an optional list of channel actions as arguments. - Note: Turn on the cable connection in app/assets/javascripts/cable.js after generating any channels. - Example: ======== rails generate channel Chat speak - creates a Chat channel class and CoffeeScript asset: + creates a Chat channel class, test and JavaScript asset: Channel: app/channels/chat_channel.rb - Assets: app/assets/javascripts/channels/chat.coffee + Test: test/channels/chat_channel_test.rb + Assets: app/javascript/channels/chat_channel.js diff --git a/actioncable/lib/rails/generators/channel/channel_generator.rb b/actioncable/lib/rails/generators/channel/channel_generator.rb index c3528370c6..0b80d1f96b 100644 --- a/actioncable/lib/rails/generators/channel/channel_generator.rb +++ b/actioncable/lib/rails/generators/channel/channel_generator.rb @@ -11,15 +11,18 @@ module Rails check_class_collision suffix: "Channel" + hook_for :test_framework + def create_channel_file template "channel.rb", File.join("app/channels", class_path, "#{file_name}_channel.rb") if options[:assets] if behavior == :invoke - template "assets/cable.js", "app/assets/javascripts/cable.js" + template "javascript/index.js", "app/javascript/channels/index.js" + template "javascript/consumer.js", "app/javascript/channels/consumer.js" end - js_template "assets/channel", File.join("app/assets/javascripts/channels", class_path, "#{file_name}") + js_template "javascript/channel", File.join("app/javascript/channels", class_path, "#{file_name}_channel") end generate_application_cable_files @@ -27,7 +30,7 @@ module Rails private def file_name - @_file_name ||= super.gsub(/_channel/i, "") + @_file_name ||= super.sub(/_channel\z/i, "") end # FIXME: Change these files to symlinks once RubyGems 2.5.0 is required. diff --git a/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee.tt b/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee.tt deleted file mode 100644 index 5467811aba..0000000000 --- a/actioncable/lib/rails/generators/channel/templates/assets/channel.coffee.tt +++ /dev/null @@ -1,14 +0,0 @@ -App.<%= class_name.underscore %> = App.cable.subscriptions.create "<%= class_name %>Channel", - connected: -> - # Called when the subscription is ready for use on the server - - disconnected: -> - # Called when the subscription has been terminated by the server - - received: (data) -> - # Called when there's incoming data on the websocket for this channel -<% actions.each do |action| -%> - - <%= action %>: -> - @perform '<%= action %>' -<% end -%> diff --git a/actioncable/lib/rails/generators/channel/templates/assets/channel.js.tt b/actioncable/lib/rails/generators/channel/templates/javascript/channel.js.tt index ab0e68b11a..ddf6b2d79b 100644 --- a/actioncable/lib/rails/generators/channel/templates/assets/channel.js.tt +++ b/actioncable/lib/rails/generators/channel/templates/javascript/channel.js.tt @@ -1,13 +1,15 @@ -App.<%= class_name.underscore %> = App.cable.subscriptions.create("<%= class_name %>Channel", { - connected: function() { +import consumer from "./consumer" + +consumer.subscriptions.create("<%= class_name %>Channel", { + connected() { // Called when the subscription is ready for use on the server }, - disconnected: function() { + disconnected() { // Called when the subscription has been terminated by the server }, - received: function(data) { + received(data) { // Called when there's incoming data on the websocket for this channel }<%= actions.any? ? ",\n" : '' %> <% actions.each do |action| -%> diff --git a/actioncable/lib/rails/generators/channel/templates/assets/cable.js.tt b/actioncable/lib/rails/generators/channel/templates/javascript/consumer.js.tt index 739aa5f022..0eceb59b18 100644 --- a/actioncable/lib/rails/generators/channel/templates/assets/cable.js.tt +++ b/actioncable/lib/rails/generators/channel/templates/javascript/consumer.js.tt @@ -1,13 +1,6 @@ // Action Cable provides the framework to deal with WebSockets in Rails. // You can generate new channels where WebSocket features live using the `rails generate channel` command. -// -//= require action_cable -//= require_self -//= require_tree ./channels -(function() { - this.App || (this.App = {}); +import { createConsumer } from "@rails/actioncable" - App.cable = ActionCable.createConsumer(); - -}).call(this); +export default createConsumer() diff --git a/actioncable/lib/rails/generators/channel/templates/javascript/index.js.tt b/actioncable/lib/rails/generators/channel/templates/javascript/index.js.tt new file mode 100644 index 0000000000..0cfcf74919 --- /dev/null +++ b/actioncable/lib/rails/generators/channel/templates/javascript/index.js.tt @@ -0,0 +1,5 @@ +// Load all the channels within this directory and all subdirectories. +// Channel files must be named *_channel.js. + +const channels = require.context('.', true, /_channel\.js$/) +channels.keys().forEach(channels) diff --git a/actioncable/lib/rails/generators/test_unit/channel_generator.rb b/actioncable/lib/rails/generators/test_unit/channel_generator.rb new file mode 100644 index 0000000000..7d13a12f0a --- /dev/null +++ b/actioncable/lib/rails/generators/test_unit/channel_generator.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module TestUnit + module Generators + class ChannelGenerator < ::Rails::Generators::NamedBase + source_root File.expand_path("templates", __dir__) + + check_class_collision suffix: "ChannelTest" + + def create_test_files + template "channel_test.rb", File.join("test/channels", class_path, "#{file_name}_channel_test.rb") + end + + private + def file_name # :doc: + @_file_name ||= super.sub(/_channel\z/i, "") + end + end + end +end diff --git a/actioncable/lib/rails/generators/test_unit/templates/channel_test.rb.tt b/actioncable/lib/rails/generators/test_unit/templates/channel_test.rb.tt new file mode 100644 index 0000000000..7307654611 --- /dev/null +++ b/actioncable/lib/rails/generators/test_unit/templates/channel_test.rb.tt @@ -0,0 +1,8 @@ +require "test_helper" + +class <%= class_name %>ChannelTest < ActionCable::Channel::TestCase + # test "subscribes" do + # subscribe + # assert subscription.confirmed? + # end +end |