From 9ea7aa84d16d99fd32ed1877e3fd6631a41e7042 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Wed, 27 Jan 2016 14:33:15 +0100 Subject: Revert "Eliminate the EventMachine dependency" --- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../lib/action_cable/subscription_adapter/postgresql.rb | 4 ++-- .../lib/action_cable/subscription_adapter/redis.rb | 16 ---------------- 3 files changed, 4 insertions(+), 20 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..85d4892e4c 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..78f8aeb599 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - Concurrent.global_io_executor << callback if callback + ::EM.next_tick(&callback) if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - Concurrent.global_io_executor.post { super } + ::EM.next_tick { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..3b86354621 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,18 +1,11 @@ -require 'thread' - gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -34,7 +27,6 @@ module ActionCable private def redis_connection_for_subscriptions - ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -45,14 +37,6 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end - - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? - end - end end end end -- cgit v1.2.3 From 74497eabd52f2f9f8c383808b11286283046c2b2 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 28 Jan 2016 15:25:31 +1030 Subject: Revert "Revert "Eliminate the EventMachine dependency"" --- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../lib/action_cable/subscription_adapter/postgresql.rb | 4 ++-- .../lib/action_cable/subscription_adapter/redis.rb | 16 ++++++++++++++++ 3 files changed, 20 insertions(+), 4 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index 85d4892e4c..c88b03947a 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -10,11 +10,11 @@ module ActionCable class AsyncSubscriberMap < SubscriberMap def add_subscriber(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 78f8aeb599..3ce1bbed68 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -63,7 +63,7 @@ module ActionCable case action when :listen pg_conn.exec("LISTEN #{pg_conn.escape_identifier channel}") - ::EM.next_tick(&callback) if callback + Concurrent.global_io_executor << callback if callback when :unlisten pg_conn.exec("UNLISTEN #{pg_conn.escape_identifier channel}") when :shutdown @@ -93,7 +93,7 @@ module ActionCable end def invoke_callback(*) - ::EM.next_tick { super } + Concurrent.global_io_executor.post { super } end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 3b86354621..a035e3988d 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,11 +1,18 @@ +require 'thread' + gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' require 'em-hiredis' require 'redis' +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + @@mutex = Mutex.new + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -27,6 +34,7 @@ module ActionCable private def redis_connection_for_subscriptions + ensure_reactor_running @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." @@ -37,6 +45,14 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end end end end -- cgit v1.2.3 From 16a6603956551703e3bbd06101c568a73bcdaa52 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 25 Jan 2016 03:53:27 +1030 Subject: Synchronize the lazy setters in Server They're all at risk of races on the first requests. --- .../lib/action_cable/subscription_adapter/async.rb | 4 ++-- .../lib/action_cable/subscription_adapter/inline.rb | 11 ++++++++++- .../lib/action_cable/subscription_adapter/postgresql.rb | 7 ++++++- .../lib/action_cable/subscription_adapter/redis.rb | 17 +++++++++++++---- 4 files changed, 31 insertions(+), 8 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/async.rb b/actioncable/lib/action_cable/subscription_adapter/async.rb index c88b03947a..cca6894289 100644 --- a/actioncable/lib/action_cable/subscription_adapter/async.rb +++ b/actioncable/lib/action_cable/subscription_adapter/async.rb @@ -4,8 +4,8 @@ module ActionCable module SubscriptionAdapter class Async < Inline # :nodoc: private - def subscriber_map - @subscriber_map ||= AsyncSubscriberMap.new + def new_subscriber_map + AsyncSubscriberMap.new end class AsyncSubscriberMap < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/inline.rb b/actioncable/lib/action_cable/subscription_adapter/inline.rb index 4a2a8d23a2..81357faead 100644 --- a/actioncable/lib/action_cable/subscription_adapter/inline.rb +++ b/actioncable/lib/action_cable/subscription_adapter/inline.rb @@ -1,6 +1,11 @@ module ActionCable module SubscriptionAdapter class Inline < Base # :nodoc: + def initialize(*) + super + @subscriber_map = nil + end + def broadcast(channel, payload) subscriber_map.broadcast(channel, payload) end @@ -19,7 +24,11 @@ module ActionCable private def subscriber_map - @subscriber_map ||= SubscriberMap.new + @subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map } + end + + def new_subscriber_map + SubscriberMap.new end end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 3ce1bbed68..abaeb92e54 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -5,6 +5,11 @@ require 'thread' module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: + def initialize(*) + super + @listener = nil + end + def broadcast(channel, payload) with_connection do |pg_conn| pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") @@ -37,7 +42,7 @@ module ActionCable private def listener - @listener ||= Listener.new(self) + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end class Listener < SubscriberMap diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index a035e3988d..560b79df16 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -13,6 +13,11 @@ module ActionCable class Redis < Base # :nodoc: @@mutex = Mutex.new + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end @@ -35,15 +40,19 @@ module ActionCable private def redis_connection_for_subscriptions ensure_reactor_running - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end end def redis_connection_for_broadcasts - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end end def ensure_reactor_running -- cgit v1.2.3 From e77368637e17e6a33db2713f651e85a09456c645 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 01:30:00 +1030 Subject: Switch the default redis adapter to a single-stream model This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with. --- .../subscription_adapter/evented_redis.rb | 67 +++++++++ .../lib/action_cable/subscription_adapter/redis.rb | 156 +++++++++++++++++---- 2 files changed, 193 insertions(+), 30 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/evented_redis.rb (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..d697548cbd --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,67 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 560b79df16..7076383efe 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,52 +1,40 @@ require 'thread' -gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def initialize(*) super - @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + @listener = nil + @redis_connection_for_broadcasts = nil end def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + ::Redis.new(@server.config.cable) end private - def redis_connection_for_subscriptions - ensure_reactor_running - @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end def redis_connection_for_broadcasts @@ -55,12 +43,120 @@ module ActionCable end end - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + class Listener < SubscriberMap + def initialize(adapter) + super() + + @adapter = adapter + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + Concurrent.global_io_executor << next_callback if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + Concurrent.global_io_executor.post { super } end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end -- cgit v1.2.3 From 5e5fd246d5852b1c49dfdb8e635fb2e2c6ae8e55 Mon Sep 17 00:00:00 2001 From: David Heinemeier Hansson Date: Thu, 4 Feb 2016 12:10:35 +0100 Subject: Allow for non-standard redis connectors --- .../lib/action_cable/subscription_adapter/evented_redis.rb | 12 ++++++++++-- actioncable/lib/action_cable/subscription_adapter/redis.rb | 6 +++++- 2 files changed, 15 insertions(+), 3 deletions(-) (limited to 'actioncable/lib/action_cable/subscription_adapter') diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb index d697548cbd..af04a58c70 100644 --- a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -13,6 +13,14 @@ module ActionCable class EventedRedis < Base # :nodoc: @@mutex = Mutex.new + # Overwrite this factory method for EventMachine redis connections if you want to use a different Redis library than EM::Hiredis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:em_redis_connector) { ->(config) { EM::Hiredis.connect(config[:url]) } } + + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil @@ -41,7 +49,7 @@ module ActionCable def redis_connection_for_subscriptions ensure_reactor_running @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + @redis_connection_for_subscriptions ||= self.class.em_redis_connector.call(@server.config.cable).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." end @@ -51,7 +59,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 7076383efe..ba4934a264 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,6 +6,10 @@ require 'redis' module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: + # Overwrite this factory method for redis connections if you want to use a different Redis library than Redis. + # This is needed, for example, when using Makara proxies for distributed Redis. + cattr_accessor(:redis_connector) { ->(config) { ::Redis.new(url: config[:url]) } } + def initialize(*) super @listener = nil @@ -39,7 +43,7 @@ module ActionCable def redis_connection_for_broadcasts @redis_connection_for_broadcasts || @server.mutex.synchronize do - @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + @redis_connection_for_broadcasts ||= self.class.redis_connector.call(@server.config.cable) end end -- cgit v1.2.3