diff options
Diffstat (limited to 'lib/action_cable')
-rw-r--r-- | lib/action_cable/channel.rb | 6 | ||||
-rw-r--r-- | lib/action_cable/channel/base.rb | 64 | ||||
-rw-r--r-- | lib/action_cable/channel/callbacks.rb | 32 | ||||
-rw-r--r-- | lib/action_cable/server.rb | 73 |
4 files changed, 175 insertions, 0 deletions
diff --git a/lib/action_cable/channel.rb b/lib/action_cable/channel.rb new file mode 100644 index 0000000000..a54302d30f --- /dev/null +++ b/lib/action_cable/channel.rb @@ -0,0 +1,6 @@ +module ActionCable + module Channel + autoload :Callbacks, 'action_cable/channel/callbacks' + autoload :Base, 'action_cable/channel/base' + end +end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb new file mode 100644 index 0000000000..82c1a14b49 --- /dev/null +++ b/lib/action_cable/channel/base.rb @@ -0,0 +1,64 @@ +module ActionCable + module Channel + + class Base + include Callbacks + + on_subscribe :start_periodic_timers + on_unsubscribe :stop_periodic_timers + + attr_reader :params + + class << self + def matches?(identifier) + raise "Please implement #{name}#matches? method" + end + end + + def initialize(connection, channel_identifier, params = {}) + @connection = connection + @channel_identifier = channel_identifier + @_active_periodic_timers = [] + @params = params + + setup + end + + def receive(data) + raise "Not implemented" + end + + def subscribe + self.class.on_subscribe_callbacks.each do |callback| + EM.next_tick { send(callback) } + end + end + + def unsubscribe + self.class.on_unsubscribe.each do |callback| + EM.next_tick { send(callback) } + end + end + + protected + def setup + # Override in subclasses + end + + def publish(data) + @connection.publish(data.merge(identifier: @channel_identifier).to_json) + end + + def start_periodic_timers + self.class.periodic_timers.each do |method, options| + @_active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) { send(method) } + end + end + + def stop_periodic_timers + @_active_periodic_timers.each {|t| t.cancel } + end + end + + end +end
\ No newline at end of file diff --git a/lib/action_cable/channel/callbacks.rb b/lib/action_cable/channel/callbacks.rb new file mode 100644 index 0000000000..cf0246a386 --- /dev/null +++ b/lib/action_cable/channel/callbacks.rb @@ -0,0 +1,32 @@ +module ActionCable + module Channel + + module Callbacks + extend ActiveSupport::Concern + + included do + class_attribute :on_subscribe_callbacks, :on_unsubscribe_callbacks, :periodic_timers, :instance_reader => false + + self.on_subscribe_callbacks = [] + self.on_unsubscribe_callbacks = [] + self.periodic_timers = [] + end + + module ClassMethods + def on_subscribe(*methods) + self.on_subscribe_callbacks += methods + end + + def on_unsubscribe(*methods) + self.on_unsubscribe_callbacks += methods + end + + def periodic_timer(method, every:) + self.periodic_timers += [ [ method, every: every ] ] + end + end + + end + + end +end
\ No newline at end of file diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb new file mode 100644 index 0000000000..2d80e96265 --- /dev/null +++ b/lib/action_cable/server.rb @@ -0,0 +1,73 @@ +require 'set' + +module ActionCable + class Server < Cramp::Websocket + on_start :initialize_subscriptions + on_data :received_data + on_finish :cleanup_subscriptions + + class_attribute :registered_channels + self.registered_channels = Set.new + + class << self + def register_channels(*channel_classes) + registered_channels.merge(channel_classes) + end + end + + def initialize_subscriptions + @subscriptions = {} + end + + def received_data(data) + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def publish(data) + render data + end + + private + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + if subscription = registered_channels.detect { |channel_klass| channel_klass.matches?(id_options) } + @subscriptions[id_key] = subscription.new(self, id_key, id_options) + @subscriptions[id_key].subscribe + else + # No channel found + end + end + + def process_message(message) + id_key = message['identifier'] + + if @subscriptions[id_key] + @subscriptions[id_key].receive(ActiveSupport::JSON.decode message['data']) + end + end + + def unsubscribe_channel(data) + id_key = data['identifier'] + @subscriptions[id_key].unsubscribe + @subscriptions.delete(id_key) + end + + end +end |