aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb14
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb9
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb21
4 files changed, 39 insertions, 16 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index cca6894289..10b3ac8cd8 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -5,16 +5,21 @@ module ActionCable
class Async < Inline # :nodoc:
private
def new_subscriber_map
- AsyncSubscriberMap.new
+ AsyncSubscriberMap.new(server.event_loop)
end
class AsyncSubscriberMap < SubscriberMap
+ def initialize(event_loop)
+ @event_loop = event_loop
+ super()
+ end
+
def add_subscriber(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
index d697548cbd..256876cf30 100644
--- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -13,6 +13,14 @@ module ActionCable
class EventedRedis < Base # :nodoc:
@@mutex = Mutex.new
+ # Overwrite this factory method for EventMachine Redis connections if you want to use a different Redis connection library than EM::Hiredis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } }
+
+ # Overwrite this factory method for Redis connections if you want to use a different Redis connection library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
def initialize(*)
super
@redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
@@ -41,9 +49,9 @@ module ActionCable
def redis_connection_for_subscriptions
ensure_reactor_running
@redis_connection_for_subscriptions || @server.mutex.synchronize do
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis|
redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ @logger.error "[ActionCable] Redis reconnect failed."
end
end
end
@@ -51,7 +59,7 @@ module ActionCable
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable)
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index abaeb92e54..66c7852f6e 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -42,14 +42,15 @@ module ActionCable
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@queue = Queue.new
@thread = Thread.new do
@@ -68,7 +69,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- Concurrent.global_io_executor << callback if callback
+ @event_loop.post(&callback) if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -98,7 +99,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 7076383efe..65434f7107 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -6,6 +6,10 @@ require 'redis'
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis.
+ # This is needed, for example, when using Makara proxies for distributed Redis.
+ cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } }
+
def initialize(*)
super
@listener = nil
@@ -29,25 +33,30 @@ module ActionCable
end
def redis_connection_for_subscriptions
- ::Redis.new(@server.config.cable)
+ redis_connection
end
private
def listener
- @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self, @server.event_loop) }
end
def redis_connection_for_broadcasts
@redis_connection_for_broadcasts || @server.mutex.synchronize do
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts ||= redis_connection
end
end
+ def redis_connection
+ self.class.redis_connector.call(@server.config.cable)
+ end
+
class Listener < SubscriberMap
- def initialize(adapter)
+ def initialize(adapter, event_loop)
super()
@adapter = adapter
+ @event_loop = event_loop
@subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
@subscription_lock = Mutex.new
@@ -76,7 +85,7 @@ module ActionCable
if callbacks = @subscribe_callbacks[chan]
next_callback = callbacks.shift
- Concurrent.global_io_executor << next_callback if next_callback
+ @event_loop.post(&next_callback) if next_callback
@subscribe_callbacks.delete(chan) if callbacks.empty?
end
end
@@ -125,7 +134,7 @@ module ActionCable
end
def invoke_callback(*)
- Concurrent.global_io_executor.post { super }
+ @event_loop.post { super }
end
private