aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/test/channel/stream_test.rb
blob: eca06fe365c822f7777c8b8e020a4ba529442293 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
# frozen_string_literal: true

require "test_helper"
require "stubs/test_connection"
require "stubs/room"

module ActionCable::StreamTests
  class Connection < ActionCable::Connection::Base
    attr_reader :websocket

    def send_async(method, *args)
      send method, *args
    end
  end

  class ChatChannel < ActionCable::Channel::Base
    def subscribed
      if params[:id]
        @room = Room.new params[:id]
        stream_from "test_room_#{@room.id}", coder: pick_coder(params[:coder])
      end
    end

    def send_confirmation
      transmit_subscription_confirmation
    end

    private def pick_coder(coder)
      case coder
      when nil, "json"
        ActiveSupport::JSON
      when "custom"
        DummyEncoder
      when "none"
        nil
      end
    end
  end

  module DummyEncoder
    extend self
    def encode(*) '{ "foo": "encoded" }' end
    def decode(*) { foo: "decoded" } end
  end

  class SymbolChannel < ActionCable::Channel::Base
    def subscribed
      stream_from :channel
    end
  end

  class StreamTest < ActionCable::TestCase
    test "streaming start and stop" do
      run_in_eventmachine do
        connection = TestConnection.new
        connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
        channel = ChatChannel.new connection, "{id: 1}", id: 1
        channel.subscribe_to_channel

        wait_for_async

        connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
        channel.unsubscribe_from_channel
      end
    end

    test "stream from non-string channel" do
      run_in_eventmachine do
        connection = TestConnection.new
        connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
        channel = SymbolChannel.new connection, ""
        channel.subscribe_to_channel

        wait_for_async

        connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
        channel.unsubscribe_from_channel
      end
    end

    test "stream_for" do
      run_in_eventmachine do
        connection = TestConnection.new
        connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }

        channel = ChatChannel.new connection, ""
        channel.subscribe_to_channel
        channel.stream_for Room.new(1)
      end
    end

    test "stream_from subscription confirmation" do
      run_in_eventmachine do
        connection = TestConnection.new

        channel = ChatChannel.new connection, "{id: 1}", id: 1
        channel.subscribe_to_channel

        assert_nil connection.last_transmission

        wait_for_async

        confirmation = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
        connection.transmit(confirmation)

        assert_equal confirmation, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
      end
    end

    test "subscription confirmation should only be sent out once" do
      run_in_eventmachine do
        connection = TestConnection.new

        channel = ChatChannel.new connection, "test_channel"
        channel.send_confirmation
        channel.send_confirmation

        wait_for_async

        expected = { "identifier" => "test_channel", "type" => "confirm_subscription" }
        assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"

        assert_equal 1, connection.transmissions.size
      end
    end
  end

  require "action_cable/subscription_adapter/async"

  class UserCallbackChannel < ActionCable::Channel::Base
    def subscribed
      stream_from :channel do
        Thread.current[:ran_callback] = true
      end
    end
  end

  class MultiChatChannel < ActionCable::Channel::Base
    def subscribed
      stream_from "main_room"
      stream_from "test_all_rooms"
    end
  end

  class StreamFromTest < ActionCable::TestCase
    setup do
      @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
      @server.config.allowed_request_origins = %w( http://rubyonrails.com )
    end

    test "custom encoder" do
      run_in_eventmachine do
        connection = open_connection
        subscribe_to connection, identifiers: { id: 1 }

        connection.websocket.expects(:transmit)
        @server.broadcast "test_room_1", { foo: "bar" }, { coder: DummyEncoder }
        wait_for_async
        wait_for_executor connection.server.worker_pool.executor
      end
    end

    test "user supplied callbacks are run through the worker pool" do
      run_in_eventmachine do
        connection = open_connection
        receive(connection, command: "subscribe", channel: UserCallbackChannel.name, identifiers: { id: 1 })

        @server.broadcast "channel", {}
        wait_for_async
        refute Thread.current[:ran_callback], "User callback was not run through the worker pool"
      end
    end

    test "subscription confirmation should only be sent out once with muptiple stream_from" do
      run_in_eventmachine do
        connection = open_connection
        expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
        connection.websocket.expects(:transmit).with(expected.to_json)
        receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})

        wait_for_async
      end
    end

    private
      def subscribe_to(connection, identifiers:)
        receive connection, command: "subscribe", identifiers: identifiers
      end

      def open_connection
        env = Rack::MockRequest.env_for "/test", "HTTP_HOST" => "localhost", "HTTP_CONNECTION" => "upgrade", "HTTP_UPGRADE" => "websocket", "HTTP_ORIGIN" => "http://rubyonrails.com"

        Connection.new(@server, env).tap do |connection|
          connection.process
          assert connection.websocket.possible?

          wait_for_async
          assert connection.websocket.alive?
        end
      end

      def receive(connection, command:, identifiers:, channel: "ActionCable::StreamTests::ChatChannel")
        identifier = JSON.generate(channel: channel, **identifiers)
        connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier)
        wait_for_async
      end
  end
end