diff options
18 files changed, 245 insertions, 154 deletions
diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index 1ea862b389..d59e48e00c 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,11 +1,16 @@ -* Add ActiveSupport::Notifications to ActionCable::Channel. +* Pubsub: automatic stream decoding. - *Matthew Wear* + stream_for @room, coder: ActiveSupport::JSON do |message| + # `message` is a Ruby hash here instead of a JSON string -* Allow channel identifiers with no backslahes/escaping to be accepted - by the subscription storer. + The `coder` must respond to `#decode`. Defaults to `coder: nil` + which skips decoding entirely. - *Jon Moss* + *Jeremy Daer* + +* Add ActiveSupport::Notifications to ActionCable::Channel. + + *Matthew Wear* * Safely support autoloading and class unloading, by preventing concurrent loads, and disconnecting all cables during reload. diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index 464d0581dd..845b747fc5 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -198,7 +198,7 @@ module ActionCable payload = { channel_class: self.class.name, data: data, via: via } ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data) + connection.transmit identifier: @identifier, message: data end end @@ -274,7 +274,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription confirmation" ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation] @subscription_confirmation_sent = true end end @@ -289,7 +289,7 @@ module ActionCable logger.info "#{self.class.name} is transmitting the subscription rejection" ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do - connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]) + connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection] end end end diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 23d7320a28..f654ce0bfa 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -46,9 +46,7 @@ module ActionCable # def subscribed # @room = Chat::Room[params[:room_number]] # - # stream_for @room, -> (encoded_message) do - # message = ActiveSupport::JSON.decode(encoded_message) - # + # stream_for @room, coder: ActiveSupport::JSON do |message| # if message['originated_at'].present? # elapsed_time = (Time.now.to_f - message['originated_at']).round(2) # @@ -71,16 +69,23 @@ module ActionCable # Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used # instead of the default of just transmitting the updates straight to the subscriber. - def stream_from(broadcasting, callback = nil) + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - callback ||= default_stream_callback(broadcasting) - streams << [ broadcasting, callback ] + if handler = callback || block + handler = -> message { handler.(coder.decode(message)) } if coder + else + handler = default_stream_handler(broadcasting, coder: coder) + end + + streams << [ broadcasting, handler ] connection.server.event_loop.post do - pubsub.subscribe(broadcasting, callback, lambda do + pubsub.subscribe(broadcasting, handler, lambda do transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -90,8 +95,11 @@ module ActionCable # Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a # <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight # to the subscriber. - def stream_for(model, callback = nil) - stream_from(broadcasting_for([ channel_name, model ]), callback) + # + # Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback. + # Defaults to `coder: nil` which does no decoding, passes raw messages. + def stream_for(model, callback = nil, coder: nil, &block) + stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder) end # Unsubscribes all streams associated with this channel from the pubsub queue. @@ -109,9 +117,11 @@ module ActionCable @_streams ||= [] end - def default_stream_callback(broadcasting) + def default_stream_handler(broadcasting, coder:) + coder ||= ActiveSupport::JSON + -> (message) do - transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}" + transmit coder.decode(message), via: "streamed from #{broadcasting}" end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index b4488265cb..604a889bb0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -51,8 +51,8 @@ module ActionCable attr_reader :server, :env, :subscriptions, :logger, :worker_pool delegate :event_loop, :pubsub, to: :server - def initialize(server, env) - @server, @env = server, env + def initialize(server, env, coder: ActiveSupport::JSON) + @server, @env, @coder = server, env, coder @worker_pool = server.worker_pool @logger = new_tagged_logger @@ -67,7 +67,7 @@ module ActionCable # Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user. # This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks. - def process # :nodoc: + def process #:nodoc: logger.info started_request_message if websocket.possible? && allow_request_origin? @@ -77,20 +77,22 @@ module ActionCable end end - # Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded. - # The data is routed to the proper channel that the connection has subscribed to. - def receive(data_in_json) + # Decodes WebSocket messages and dispatches them to subscribed channels. + # WebSocket message transfer encoding is always JSON. + def receive(websocket_message) #:nodoc: + send_async :dispatch_websocket_message, websocket_message + end + + def dispatch_websocket_message(websocket_message) #:nodoc: if websocket.alive? - subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json) + subscriptions.execute_command decode(websocket_message) else - logger.error "Received data without a live WebSocket (#{data_in_json.inspect})" + logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})" end end - # Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the - # Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON. - def transmit(data) # :nodoc: - websocket.transmit data + def transmit(cable_message) # :nodoc: + websocket.transmit encode(cable_message) end # Close the WebSocket connection. @@ -115,7 +117,7 @@ module ActionCable end def beat - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i) + transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i end def on_open # :nodoc: @@ -152,6 +154,14 @@ module ActionCable attr_reader :message_buffer private + def encode(cable_message) + @coder.encode cable_message + end + + def decode(websocket_message) + @coder.decode websocket_message + end + def handle_open connect if respond_to?(:connect) subscribe_to_internal_channel @@ -178,7 +188,7 @@ module ActionCable # Send welcome message to the internal connection monitor channel. # This ensures the connection monitor state is reset after a successful # websocket connection. - transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome]) + transmit type: ActionCable::INTERNAL[:message_types][:welcome] end def allow_request_origin? diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 3c5d39f59a..f70d52f99b 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -11,7 +11,7 @@ module ActionCable def subscribe_to_internal_channel if connection_identifier.present? - callback = -> (message) { process_internal_message(message) } + callback = -> (message) { process_internal_message decode(message) } @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] @@ -27,8 +27,6 @@ module ActionCable end def process_internal_message(message) - message = ActiveSupport::JSON.decode(message) - case message['type'] when 'disconnect' logger.info "Removing connection (#{connection_identifier})" diff --git a/actioncable/lib/action_cable/connection/message_buffer.rb b/actioncable/lib/action_cable/connection/message_buffer.rb index 19f2e6e918..6a80770cae 100644 --- a/actioncable/lib/action_cable/connection/message_buffer.rb +++ b/actioncable/lib/action_cable/connection/message_buffer.rb @@ -30,7 +30,7 @@ module ActionCable protected attr_reader :connection - attr_accessor :buffered_messages + attr_reader :buffered_messages private def valid?(message) @@ -38,7 +38,7 @@ module ActionCable end def receive(message) - connection.send_async :receive, message + connection.receive message end def buffer(message) diff --git a/actioncable/lib/action_cable/connection/subscriptions.rb b/actioncable/lib/action_cable/connection/subscriptions.rb index 5aa907c2d3..3742f248d1 100644 --- a/actioncable/lib/action_cable/connection/subscriptions.rb +++ b/actioncable/lib/action_cable/connection/subscriptions.rb @@ -23,13 +23,13 @@ module ActionCable end def add(data) - id_options = decode_hash(data['identifier']) - identifier = normalize_identifier(id_options) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access subscription_klass = connection.server.channel_classes[id_options[:channel]] if subscription_klass - subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options) + subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) else logger.error "Subscription class not found (#{data.inspect})" end @@ -37,7 +37,7 @@ module ActionCable def remove(data) logger.info "Unsubscribing from channel: #{data['identifier']}" - remove_subscription subscriptions[normalize_identifier(data['identifier'])] + remove_subscription subscriptions[data['identifier']] end def remove_subscription(subscription) @@ -46,7 +46,7 @@ module ActionCable end def perform_action(data) - find(data).perform_action(decode_hash(data['data'])) + find(data).perform_action ActiveSupport::JSON.decode(data['data']) end def identifiers @@ -63,21 +63,8 @@ module ActionCable private delegate :logger, to: :connection - def normalize_identifier(identifier) - identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash) - identifier - end - - # If `data` is a Hash, this means that the original JSON - # sent by the client had no backslashes in it, and does - # not need to be decoded again. - def decode_hash(data) - data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash) - data.with_indifferent_access - end - def find(data) - if subscription = subscriptions[normalize_identifier(data['identifier'])] + if subscription = subscriptions[data['identifier']] subscription else raise "Unable to find subscription with identifier: #{data['identifier']}" diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 98025f27f2..8f93564113 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -19,27 +19,28 @@ module ActionCable # new Notification data['title'], body: data['body'] module Broadcasting # Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded. - def broadcast(broadcasting, message) - broadcaster_for(broadcasting).broadcast(message) + def broadcast(broadcasting, message, coder: ActiveSupport::JSON) + broadcaster_for(broadcasting, coder: coder).broadcast(message) end # Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that # may need multiple spots to transmit to a specific broadcasting over and over. - def broadcaster_for(broadcasting) - Broadcaster.new(self, String(broadcasting)) + def broadcaster_for(broadcasting, coder: ActiveSupport::JSON) + Broadcaster.new(self, String(broadcasting), coder: coder) end private class Broadcaster - attr_reader :server, :broadcasting + attr_reader :server, :broadcasting, :coder - def initialize(server, broadcasting) - @server, @broadcasting = server, broadcasting + def initialize(server, broadcasting, coder:) + @server, @broadcasting, @coder = server, broadcasting, coder end def broadcast(message) - server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}" + encoded = coder ? coder.encode(message) : message + server.pubsub.broadcast broadcasting, encoded end end end diff --git a/actioncable/test/channel/base_test.rb b/actioncable/test/channel/base_test.rb index bed54eb6b3..daa782eeb3 100644 --- a/actioncable/test/channel/base_test.rb +++ b/actioncable/test/channel/base_test.rb @@ -146,12 +146,12 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase test "transmitting data" do @channel.perform_action 'action' => :get_latest - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" } + expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" }} assert_equal expected, @connection.last_transmission end test "subscription confirmation" do - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } assert_equal expected, @connection.last_transmission end diff --git a/actioncable/test/channel/rejection_test.rb b/actioncable/test/channel/rejection_test.rb index aa93396d44..15db57d6ba 100644 --- a/actioncable/test/channel/rejection_test.rb +++ b/actioncable/test/channel/rejection_test.rb @@ -18,7 +18,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @channel = SecretChannel.new @connection, "{id: 1}", { id: 1 } - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription" + expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } assert_equal expected, @connection.last_transmission end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 526ea92e4f..f51f19eb7d 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -2,18 +2,43 @@ require 'test_helper' require 'stubs/test_connection' require 'stubs/room' -class ActionCable::Channel::StreamTest < ActionCable::TestCase +module ActionCable::StreamTests + class Connection < ActionCable::Connection::Base + attr_reader :websocket + + def send_async(method, *args) + send method, *args + end + end + class ChatChannel < ActionCable::Channel::Base def subscribed if params[:id] @room = Room.new params[:id] - stream_from "test_room_#{@room.id}" + stream_from "test_room_#{@room.id}", coder: pick_coder(params[:coder]) end end def send_confirmation transmit_subscription_confirmation end + + private def pick_coder(coder) + case coder + when nil, 'json' + ActiveSupport::JSON + when 'custom' + DummyEncoder + when 'none' + nil + end + end + end + + module DummyEncoder + extend self + def encode(*) '{ "foo": "encoded" }' end + def decode(*) { foo: 'decoded' } end end class SymbolChannel < ActionCable::Channel::Base @@ -22,69 +47,114 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase end end - test "streaming start and stop" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = ChatChannel.new connection, "{id: 1}", { id: 1 } + class StreamTest < ActionCable::TestCase + test "streaming start and stop" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } - channel.unsubscribe_from_channel + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } + channel.unsubscribe_from_channel + end end - end - test "stream from non-string channel" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = SymbolChannel.new connection, "" + test "stream from non-string channel" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + channel = SymbolChannel.new connection, "" - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } - channel.unsubscribe_from_channel + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } + channel.unsubscribe_from_channel + end end - end - test "stream_for" do - run_in_eventmachine do - connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } + test "stream_for" do + run_in_eventmachine do + connection = TestConnection.new + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } - channel = ChatChannel.new connection, "" - channel.stream_for Room.new(1) + channel = ChatChannel.new connection, "" + channel.stream_for Room.new(1) + end end - end - test "stream_from subscription confirmation" do - run_in_eventmachine do - connection = TestConnection.new + test "stream_from subscription confirmation" do + run_in_eventmachine do + connection = TestConnection.new - ChatChannel.new connection, "{id: 1}", { id: 1 } - assert_nil connection.last_transmission + ChatChannel.new connection, "{id: 1}", { id: 1 } + assert_nil connection.last_transmission - wait_for_async + wait_for_async - expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" - connection.transmit(expected) + confirmation = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } + connection.transmit(confirmation) - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + assert_equal confirmation, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" + end end - end - test "subscription confirmation should only be sent out once" do - run_in_eventmachine do - connection = TestConnection.new + test "subscription confirmation should only be sent out once" do + run_in_eventmachine do + connection = TestConnection.new - channel = ChatChannel.new connection, "test_channel" - channel.send_confirmation - channel.send_confirmation + channel = ChatChannel.new connection, "test_channel" + channel.send_confirmation + channel.send_confirmation - wait_for_async + wait_for_async - expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription" - assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" + expected = { "identifier" => "test_channel", "type" => "confirm_subscription" } + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation" - assert_equal 1, connection.transmissions.size + assert_equal 1, connection.transmissions.size + end end end + require 'action_cable/subscription_adapter/inline' + + class StreamEncodingTest < ActionCable::TestCase + setup do + @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + @server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel) + end + + test 'custom encoder' do + run_in_eventmachine do + connection = open_connection + subscribe_to connection, identifiers: { id: 1 } + + connection.websocket.expects(:transmit) + @server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder + wait_for_async + end + end + + private + def subscribe_to(connection, identifiers:) + receive connection, command: 'subscribe', identifiers: identifiers + end + + def open_connection + env = Rack::MockRequest.env_for '/test', 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'HTTP_ORIGIN' => 'http://rubyonrails.com' + + Connection.new(@server, env).tap do |connection| + connection.process + assert connection.websocket.possible? + + wait_for_async + assert connection.websocket.alive? + end + end + + def receive(connection, command:, identifiers:) + identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers) + connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier) + wait_for_async + end + end end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 5f5c09d1a1..5ac453db35 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -75,11 +75,11 @@ class ClientTest < ActionCable::TestCase end @ws.on(:message) do |event| - hash = JSON.parse(event.data) - if hash['type'] == 'ping' + message = JSON.parse(event.data) + if message['type'] == 'ping' @pings += 1 else - @messages << hash + @messages << message @has_messages.release end end @@ -116,8 +116,8 @@ class ClientTest < ActionCable::TestCase list end - def send_message(hash) - @ws.send(JSON.dump(hash)) + def send_message(message) + @ws.send(JSON.generate(message)) end def close @@ -148,9 +148,9 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) c.close end @@ -165,12 +165,12 @@ class ClientTest < ActionCable::TestCase clients.map {|c| Concurrent::Future.execute { assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) barrier_1.wait WAIT_WHEN_EXPECTING_EVENT - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello') barrier_2.wait WAIT_WHEN_EXPECTING_EVENT assert_equal clients.size, c.read_messages(clients.size).size } }.each(&:wait!) @@ -185,9 +185,9 @@ class ClientTest < ActionCable::TestCase clients.map {|c| Concurrent::Future.execute { assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) } }.each(&:wait!) @@ -199,16 +199,16 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello') c.close # disappear before write c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) - c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello') + c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) c.close # disappear before read end @@ -217,7 +217,7 @@ class ClientTest < ActionCable::TestCase def test_unsubscribe_client with_puma_server do |port| app = ActionCable.server - identifier = JSON.dump(channel: 'EchoChannel') + identifier = JSON.generate(channel: 'EchoChannel') c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) @@ -240,7 +240,7 @@ class ClientTest < ActionCable::TestCase with_puma_server do |port| c = faye_client(port) assert_equal({"type" => "welcome"}, c.read_message) - c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel') + c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) ActionCable.server.restart diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index c3d5f1f90b..b48d9af809 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -40,8 +40,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close) - message = ActiveSupport::JSON.encode('type' => 'disconnect') - @connection.process_internal_message message + @connection.process_internal_message 'type' => 'disconnect' end end @@ -50,8 +49,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase open_connection_with_stubbed_pubsub @connection.websocket.expects(:close).never - message = ActiveSupport::JSON.encode('type' => 'unknown') - @connection.process_internal_message message + @connection.process_internal_message 'type' => 'unknown' end end diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb index f91597f567..53e8547245 100644 --- a/actioncable/test/connection/subscriptions_test.rb +++ b/actioncable/test/connection/subscriptions_test.rb @@ -88,7 +88,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase channel1 = subscribe_to_chat_channel - channel2_id = ActiveSupport::JSON.encode({ id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel' }) + channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') channel2 = subscribe_to_chat_channel(channel2_id) channel1.expects(:unsubscribe_from_channel) diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index 8ba284fdc6..885450dda6 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -3,24 +3,31 @@ require 'stubs/user' class TestConnection attr_reader :identifiers, :logger, :current_user, :server, :transmissions - def initialize(user = User.new("lifo")) + delegate :pubsub, to: :server + + def initialize(user = User.new("lifo"), coder: ActiveSupport::JSON, subscription_adapter: SuccessAdapter) + @coder = coder @identifiers = [ :current_user ] @current_user = user @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @server = TestServer.new + @server = TestServer.new(subscription_adapter: subscription_adapter) @transmissions = [] end - def pubsub - SuccessAdapter.new(server) + def transmit(cable_message) + @transmissions << encode(cable_message) end - def transmit(data) - @transmissions << data + def last_transmission + decode @transmissions.last if @transmissions.any? end - def last_transmission - @transmissions.last + def decode(websocket_message) + @coder.decode websocket_message + end + + def encode(cable_message) + @coder.encode cable_message end end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 9e860825f3..b86f422a13 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -2,22 +2,26 @@ require 'ostruct' class TestServer include ActionCable::Server::Connections + include ActionCable::Server::Broadcasting - attr_reader :logger, :config + attr_reader :logger, :config, :mutex - def initialize + def initialize(subscription_adapter: SuccessAdapter) @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) + + @config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter) @config.use_faye = ENV['FAYE'].present? @config.client_socket_class = if @config.use_faye ActionCable::Connection::FayeClientSocket else ActionCable::Connection::ClientSocket end + + @mutex = Monitor.new end def pubsub - @config.subscription_adapter.new(self) + @pubsub ||= @config.subscription_adapter.new(self) end def event_loop diff --git a/actioncable/test/subscription_adapter/common.rb b/actioncable/test/subscription_adapter/common.rb index 82f0abbbf3..285c690df0 100644 --- a/actioncable/test/subscription_adapter/common.rb +++ b/actioncable/test/subscription_adapter/common.rb @@ -20,8 +20,7 @@ module CommonSubscriptionAdapterTest end def teardown - @tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter - @rx_adapter.shutdown if @rx_adapter + [@rx_adapter, @tx_adapter].uniq.each(&:shutdown) end diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 030362d512..de1ee96770 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -2,11 +2,13 @@ require 'action_cable' require 'active_support/testing/autorun' require 'puma' - require 'mocha/setup' - require 'rack/mock' -require 'active_support/core_ext/hash/indifferent_access' + +begin + require 'byebug' +rescue LoadError +end # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } |