From e77368637e17e6a33db2713f651e85a09456c645 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Mon, 1 Feb 2016 01:30:00 +1030 Subject: Switch the default redis adapter to a single-stream model This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with. --- Gemfile | 1 + Gemfile.lock | 1 + .../subscription_adapter/evented_redis.rb | 67 +++++++++ .../lib/action_cable/subscription_adapter/redis.rb | 156 +++++++++++++++++---- .../subscription_adapter/evented_redis_test.rb | 10 ++ .../test/subscription_adapter/redis_test.rb | 8 +- railties/lib/rails/generators/app_base.rb | 1 - railties/test/generators/app_generator_test.rb | 3 - 8 files changed, 212 insertions(+), 35 deletions(-) create mode 100644 actioncable/lib/action_cable/subscription_adapter/evented_redis.rb create mode 100644 actioncable/test/subscription_adapter/evented_redis_test.rb diff --git a/Gemfile b/Gemfile index f8df08ee43..78f9853bed 100644 --- a/Gemfile +++ b/Gemfile @@ -65,6 +65,7 @@ group :cable do gem 'puma', require: false gem 'em-hiredis', require: false + gem 'hiredis', require: false gem 'redis', require: false gem 'faye-websocket', require: false diff --git a/Gemfile.lock b/Gemfile.lock index d2d616b39d..b15d3498a8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -286,6 +286,7 @@ DEPENDENCIES delayed_job_active_record em-hiredis faye-websocket + hiredis jquery-rails json kindlerb (= 0.1.1) diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb new file mode 100644 index 0000000000..d697548cbd --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb @@ -0,0 +1,67 @@ +require 'thread' + +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' + +EventMachine.epoll if EventMachine.epoll? +EventMachine.kqueue if EventMachine.kqueue? + +module ActionCable + module SubscriptionAdapter + class EventedRedis < Base # :nodoc: + @@mutex = Mutex.new + + def initialize(*) + super + @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + end + + def broadcast(channel, payload) + redis_connection_for_broadcasts.publish(channel, payload) + end + + def subscribe(channel, message_callback, success_callback = nil) + redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback { |reply| success_callback.call } if success_callback + end + end + + def unsubscribe(channel, message_callback) + redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + end + + def shutdown + redis_connection_for_subscriptions.pubsub.close_connection + @redis_connection_for_subscriptions = nil + end + + private + def redis_connection_for_subscriptions + ensure_reactor_running + @redis_connection_for_subscriptions || @server.mutex.synchronize do + @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end + end + end + end + + def redis_connection_for_broadcasts + @redis_connection_for_broadcasts || @server.mutex.synchronize do + @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable) + end + end + + def ensure_reactor_running + return if EventMachine.reactor_running? + @@mutex.synchronize do + Thread.new { EventMachine.run } unless EventMachine.reactor_running? + Thread.pass until EventMachine.reactor_running? + end + end + end + end +end diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index 560b79df16..7076383efe 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,52 +1,40 @@ require 'thread' -gem 'em-hiredis', '~> 0.3.0' gem 'redis', '~> 3.0' -require 'em-hiredis' require 'redis' -EventMachine.epoll if EventMachine.epoll? -EventMachine.kqueue if EventMachine.kqueue? - module ActionCable module SubscriptionAdapter class Redis < Base # :nodoc: - @@mutex = Mutex.new - def initialize(*) super - @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil + @listener = nil + @redis_connection_for_broadcasts = nil end def broadcast(channel, payload) redis_connection_for_broadcasts.publish(channel, payload) end - def subscribe(channel, message_callback, success_callback = nil) - redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result| - result.callback { |reply| success_callback.call } if success_callback - end + def subscribe(channel, callback, success_callback = nil) + listener.add_subscriber(channel, callback, success_callback) end - def unsubscribe(channel, message_callback) - redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback) + def unsubscribe(channel, callback) + listener.remove_subscriber(channel, callback) end def shutdown - redis_connection_for_subscriptions.pubsub.close_connection - @redis_connection_for_subscriptions = nil + @listener.shutdown if @listener + end + + def redis_connection_for_subscriptions + ::Redis.new(@server.config.cable) end private - def redis_connection_for_subscriptions - ensure_reactor_running - @redis_connection_for_subscriptions || @server.mutex.synchronize do - @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." - end - end - end + def listener + @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) } end def redis_connection_for_broadcasts @@ -55,12 +43,120 @@ module ActionCable end end - def ensure_reactor_running - return if EventMachine.reactor_running? - @@mutex.synchronize do - Thread.new { EventMachine.run } unless EventMachine.reactor_running? - Thread.pass until EventMachine.reactor_running? + class Listener < SubscriberMap + def initialize(adapter) + super() + + @adapter = adapter + + @subscribe_callbacks = Hash.new { |h, k| h[k] = [] } + @subscription_lock = Mutex.new + + @raw_client = nil + + @when_connected = [] + + @thread = nil + end + + def listen(conn) + conn.without_reconnect do + original_client = conn.client + + conn.subscribe('_action_cable_internal') do |on| + on.subscribe do |chan, count| + @subscription_lock.synchronize do + if count == 1 + @raw_client = original_client + + until @when_connected.empty? + @when_connected.shift.call + end + end + + if callbacks = @subscribe_callbacks[chan] + next_callback = callbacks.shift + Concurrent.global_io_executor << next_callback if next_callback + @subscribe_callbacks.delete(chan) if callbacks.empty? + end + end + end + + on.message do |chan, message| + broadcast(chan, message) + end + + on.unsubscribe do |chan, count| + if count == 0 + @subscription_lock.synchronize do + @raw_client = nil + end + end + end + end + end + end + + def shutdown + @subscription_lock.synchronize do + return if @thread.nil? + + when_connected do + send_command('unsubscribe') + @raw_client = nil + end + end + + Thread.pass while @thread.alive? + end + + def add_channel(channel, on_success) + @subscription_lock.synchronize do + ensure_listener_running + @subscribe_callbacks[channel] << on_success + when_connected { send_command('subscribe', channel) } + end + end + + def remove_channel(channel) + @subscription_lock.synchronize do + when_connected { send_command('unsubscribe', channel) } + end + end + + def invoke_callback(*) + Concurrent.global_io_executor.post { super } end + + private + def ensure_listener_running + @thread ||= Thread.new do + Thread.current.abort_on_exception = true + + conn = @adapter.redis_connection_for_subscriptions + listen conn + end + end + + def when_connected(&block) + if @raw_client + block.call + else + @when_connected << block + end + end + + def send_command(*command) + @raw_client.write(command) + + very_raw_connection = + @raw_client.connection.instance_variable_defined?(:@connection) && + @raw_client.connection.instance_variable_get(:@connection) + + if very_raw_connection && very_raw_connection.respond_to?(:flush) + very_raw_connection.flush + end + end end end end diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb new file mode 100644 index 0000000000..70333e51bd --- /dev/null +++ b/actioncable/test/subscription_adapter/evented_redis_test.rb @@ -0,0 +1,10 @@ +require 'test_helper' +require_relative './common' + +class EventedRedisAdapterTest < ActionCable::TestCase + include CommonSubscriptionAdapterTest + + def cable_config + { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' } + end +end diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb index 8d52832c87..4f34dd86c9 100644 --- a/actioncable/test/subscription_adapter/redis_test.rb +++ b/actioncable/test/subscription_adapter/redis_test.rb @@ -5,6 +5,12 @@ class RedisAdapterTest < ActionCable::TestCase include CommonSubscriptionAdapterTest def cable_config - { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' } + { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' } + end +end + +class RedisAdapterTest::Hiredis < RedisAdapterTest + def cable_config + super.merge(driver: 'hiredis') end end diff --git a/railties/lib/rails/generators/app_base.rb b/railties/lib/rails/generators/app_base.rb index c629459d95..51394de824 100644 --- a/railties/lib/rails/generators/app_base.rb +++ b/railties/lib/rails/generators/app_base.rb @@ -344,7 +344,6 @@ module Rails return [] if options[:skip_action_cable] comment = 'Action Cable dependencies for the Redis adapter' gems = [] - gems << GemfileEntry.new("em-hiredis", '~> 0.3.0', comment) gems << GemfileEntry.new("redis", '~> 3.0', comment) gems end diff --git a/railties/test/generators/app_generator_test.rb b/railties/test/generators/app_generator_test.rb index 53f8ed6af0..49536c068f 100644 --- a/railties/test/generators/app_generator_test.rb +++ b/railties/test/generators/app_generator_test.rb @@ -400,7 +400,6 @@ class AppGeneratorTest < Rails::Generators::TestCase assert_no_match(/action_cable_meta_tag/, content) end assert_file "Gemfile" do |content| - assert_no_match(/em-hiredis/, content) assert_no_match(/redis/, content) end end @@ -412,14 +411,12 @@ class AppGeneratorTest < Rails::Generators::TestCase assert_no_file "app/assets/javascripts/cable.coffee" assert_no_file "app/channels" assert_file "Gemfile" do |content| - assert_no_match(/em-hiredis/, content) assert_no_match(/redis/, content) end end def test_action_cable_redis_gems run_generator - assert_gem 'em-hiredis' assert_gem 'redis' end -- cgit v1.2.3