diff options
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter/redis.rb')
-rw-r--r-- | actioncable/lib/action_cable/subscription_adapter/redis.rb | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index ba4934a264..65434f7107 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -33,25 +33,30 @@ module ActionCable end def redis_connection_for_subscriptions - ::Redis.new(@server.config.cable) + redis_connection end private def listener - @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) } end def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) + @redis_connection_for_broadcasts ||= redis_connection end end + def redis_connection + self.class.redis_connector.call(@server.config.cable) + end + class Listener < SubscriberMap - def initialize(adapter) + def initialize(adapter, event_loop) super() @adapter = adapter + @event_loop = event_loop @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } @subscription_lock = Mutex.new @@ -80,7 +85,7 @@ module ActionCable if callbacks = @subscribe_callbacks[chan] next_callback = callbacks.shift - Concurrent.global_io_executor << next_callback if next_callback + @event_loop.post(&next_callback) if next_callback @subscribe_callbacks.delete(chan) if callbacks.empty? end end @@ -129,7 +134,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + @event_loop.post { super } end private |