aboutsummaryrefslogtreecommitdiffstats
path: root/actioncable/lib/action_cable
diff options
context:
space:
mode:
Diffstat (limited to 'actioncable/lib/action_cable')
-rw-r--r--actioncable/lib/action_cable/server/base.rb23
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/async.rb4
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/inline.rb11
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/postgresql.rb7
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb17
5 files changed, 47 insertions, 15 deletions
diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb
index b00abd208c..fe48c112df 100644
--- a/actioncable/lib/action_cable/server/base.rb
+++ b/actioncable/lib/action_cable/server/base.rb
@@ -1,3 +1,5 @@
+require 'thread'
+
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
@@ -13,7 +15,12 @@ module ActionCable
def self.logger; config.logger; end
delegate :logger, to: :config
+ attr_reader :mutex
+
def initialize
+ @mutex = Mutex.new
+
+ @remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
end
# Called by rack to setup the server.
@@ -29,29 +36,31 @@ module ActionCable
# Gateway to RemoteConnections. See that class for details.
def remote_connections
- @remote_connections ||= RemoteConnections.new(self)
+ @remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
end
def stream_event_loop
- @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
+ @stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
- @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
+ @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end
# Requires and returns a hash of all the channel class constants keyed by name.
def channel_classes
- @channel_classes ||= begin
- config.channel_paths.each { |channel_path| require channel_path }
- config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ @channel_classes || @mutex.synchronize do
+ @channel_classes ||= begin
+ config.channel_paths.each { |channel_path| require channel_path }
+ config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
+ end
end
end
# Adapter used for all streams/broadcasting.
def pubsub
- @pubsub ||= config.pubsub_adapter.new(self)
+ @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
end
# All the identifiers applied to the connection class associated with this server.
diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb
index c88b03947a..cca6894289 100644
--- a/actioncable/lib/action_cable/subscription_adapter/async.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/async.rb
@@ -4,8 +4,8 @@ module ActionCable
module SubscriptionAdapter
class Async < Inline # :nodoc:
private
- def subscriber_map
- @subscriber_map ||= AsyncSubscriberMap.new
+ def new_subscriber_map
+ AsyncSubscriberMap.new
end
class AsyncSubscriberMap < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb
index 4a2a8d23a2..81357faead 100644
--- a/actioncable/lib/action_cable/subscription_adapter/inline.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb
@@ -1,6 +1,11 @@
module ActionCable
module SubscriptionAdapter
class Inline < Base # :nodoc:
+ def initialize(*)
+ super
+ @subscriber_map = nil
+ end
+
def broadcast(channel, payload)
subscriber_map.broadcast(channel, payload)
end
@@ -19,7 +24,11 @@ module ActionCable
private
def subscriber_map
- @subscriber_map ||= SubscriberMap.new
+ @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
+ end
+
+ def new_subscriber_map
+ SubscriberMap.new
end
end
end
diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
index 3ce1bbed68..abaeb92e54 100644
--- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb
@@ -5,6 +5,11 @@ require 'thread'
module ActionCable
module SubscriptionAdapter
class PostgreSQL < Base # :nodoc:
+ def initialize(*)
+ super
+ @listener = nil
+ end
+
def broadcast(channel, payload)
with_connection do |pg_conn|
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
@@ -37,7 +42,7 @@ module ActionCable
private
def listener
- @listener ||= Listener.new(self)
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
class Listener < SubscriberMap
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index a035e3988d..560b79df16 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -13,6 +13,11 @@ module ActionCable
class Redis < Base # :nodoc:
@@mutex = Mutex.new
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
@@ -35,15 +40,19 @@ module ActionCable
private
def redis_connection_for_subscriptions
ensure_reactor_running
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
end
end
end
def redis_connection_for_broadcasts
- @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
end
def ensure_reactor_running