aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter/redis.rb
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/redis.rb')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb9
1 files changed, 5 insertions, 4 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 6b4236e7d3..65434f7107 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -38,7 +38,7 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@@ -52,10 +52,11 @@ module ActionCable
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@@ -84,7 +85,7 @@ module ActionCable
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
- Concurrent.global_io_executor << next_callback if next_callback
+ @event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@@ -133,7 +134,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
private