aboutsummaryrefslogtreecommitdiffstats
path: root/lib/action_cable/channel/base.rb
blob: 6c55a8ed65481d8b365238bc3c0c86e9c646c9a8 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
module ActionCable
  module Channel
    class Base
      include Callbacks
      include Redis

      on_subscribe   :start_periodic_timers
      on_unsubscribe :stop_periodic_timers

      attr_reader :params, :connection
      delegate :logger, to: :connection

      class << self
        def matches?(identifier)
          raise "Please implement #{name}#matches? method"
        end
      end

      def initialize(connection, channel_identifier, params = {})
        @connection = connection
        @channel_identifier = channel_identifier
        @_active_periodic_timers = []
        @params = params

        perform_connection
      end

      def perform_connection
        logger.info "#{channel_name} connecting"
        connect
        run_subscribe_callbacks
      end

      def perform_action(data)
        if authorized?
          action = extract_action(data)

          if performable_action?(action)
            logger.info channel_name + compose_signature(action, data)
            public_send action, data
          else
            logger.error "#{channel_name} failed to process #{compose_signature(action, data)}"
          end
        else
          unauthorized
        end
      end

      def perform_disconnection
        disconnect
        run_unsubscribe_callbacks
        logger.info "#{channel_name} disconnected"
      end


      protected
        # Override in subclasses
        def authorized?
          true
        end

        def unauthorized
          logger.error "#{channel_name}: Unauthorized access"
        end


        def connect
          # Override in subclasses
        end

        def disconnect
          # Override in subclasses
        end


        def transmit(data, via: nil)
          if authorized?
            logger.info "#{channel_name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
            connection.transmit({ identifier: @channel_identifier, message: data }.to_json)
          else
            unauthorized
          end
        end


        def channel_name
          self.class.name
        end

      private
        def extract_action(data)
          (data['action'].presence || :receive).to_sym
        end

        def performable_action?(action)
          self.class.instance_methods(false).include?(action)
        end

        def compose_signature(action, data)
          "##{action}".tap do |signature|
            if (arguments = data.except('action')).any?
              signature << "(#{arguments.inspect})"
            end
          end
        end


        def run_subscribe_callbacks
          self.class.on_subscribe_callbacks.each { |callback| send(callback) }
        end

        def run_unsubscribe_callbacks
          self.class.on_unsubscribe_callbacks.each { |callback| send(callback) }
        end


        def start_periodic_timers
          self.class.periodic_timers.each do |callback, options|
            @_active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
              worker_pool.async.run_periodic_timer(self, callback)
            end
          end
        end

        def stop_periodic_timers
          @_active_periodic_timers.each { |timer| timer.cancel }
        end


        def worker_pool
          connection.worker_pool
        end
    end
  end
end