aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb59
1 files changed, 59 insertions, 0 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
new file mode 100644
index 0000000000..01cdc2dfa1
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/subscriber_map.rb
@@ -0,0 +1,59 @@
+# frozen_string_literal: true
+
+module ActionCable
+ module SubscriptionAdapter
+ class SubscriberMap
+ def initialize
+ @subscribers = Hash.new { |h, k| h[k] = [] }
+ @sync = Mutex.new
+ end
+
+ def add_subscriber(channel, subscriber, on_success)
+ @sync.synchronize do
+ new_channel = !@subscribers.key?(channel)
+
+ @subscribers[channel] << subscriber
+
+ if new_channel
+ add_channel channel, on_success
+ elsif on_success
+ on_success.call
+ end
+ end
+ end
+
+ def remove_subscriber(channel, subscriber)
+ @sync.synchronize do
+ @subscribers[channel].delete(subscriber)
+
+ if @subscribers[channel].empty?
+ @subscribers.delete channel
+ remove_channel channel
+ end
+ end
+ end
+
+ def broadcast(channel, message)
+ list = @sync.synchronize do
+ return if !@subscribers.key?(channel)
+ @subscribers[channel].dup
+ end
+
+ list.each do |subscriber|
+ invoke_callback(subscriber, message)
+ end
+ end
+
+ def add_channel(channel, on_success)
+ on_success.call if on_success
+ end
+
+ def remove_channel(channel)
+ end
+
+ def invoke_callback(callback, message)
+ callback.call message
+ end
+ end
+ end
+end