aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable/subscription_adapter
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable/subscription_adapter')
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb8
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb33
4 files changed, 51 insertions, 12 deletions
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index 85d4892e4c..cca6894289 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -4,17 +4,17 @@ module ActionCable
module SubscriptionAdapter
class Async < Inline # :nodoc:
private
- def subscriber_map
- @subscriber_map ||= AsyncSubscriberMap.new
+ def new_subscriber_map
+ AsyncSubscriberMap.new
end
class AsyncSubscriberMap < SubscriberMap
def add_subscriber(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
def invoke_callback(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 4a2a8d23a2..81357faead 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -1,6 +1,11 @@
module ActionCable
module SubscriptionAdapter
class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
def broadcast(channel, payload)
subscriber_map.broadcast(channel, payload)
end
@@ -19,7 +24,11 @@ module ActionCable
private
def subscriber_map
- @subscriber_map ||= SubscriberMap.new
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 78f8aeb599..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -37,7 +42,7 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
@@ -63,7 +68,7 @@ module ActionCable
case action
when :listen
pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}")
- ::EM.next_tick(&callback) if callback
+ Concurrent.global_io_executor << callback if callback
when :unlisten
pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}")
when :shutdown
@@ -93,7 +98,7 @@ module ActionCable
end
def invoke_callback(*)
- ::EM.next_tick { super }
+ Concurrent.global_io_executor.post { super }
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 3b86354621..560b79df16 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,11 +1,23 @@
+require 'thread'
+
gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
require 'em-hiredis'
require 'redis'
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
@@ -27,15 +39,28 @@ module ActionCable
private
def redis_connection_for_subscriptions
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
end
end
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
end
end
end