aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/channel/streams.rb
blob: 3eac776e617ab55eb99035575b2e09af22168daa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
module ActionCable
  module Channel
    module Streams
      extend ActiveSupport::Concern

      included do
        on_unsubscribe :stop_all_streams
      end

      def stream_from(broadcasting, callback = nil)
        callback ||= default_stream_callback(broadcasting)

        streams << [ broadcasting, callback ]
        pubsub.subscribe broadcasting, &callback

        logger.info "#{channel_name} is streaming from #{broadcasting}"
      end

      def stop_all_streams
        streams.each do |broadcasting, callback|
          pubsub.unsubscribe_proc broadcasting, callback
          logger.info "#{channel_name} stopped streaming from #{broadcasting}"
        end
      end

      private
        delegate :pubsub, to: :connection

        def streams
          @_streams ||= []
        end

        def default_stream_callback(broadcasting)
          -> (message) do
            transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
          end
        end
    end
  end
end