aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthew Draper <matthew@trebex.net>2016-02-01 01:30:00 +1030
committerMatthew Draper <matthew@trebex.net>2016-02-01 01:56:47 +1030
commite77368637e17e6a33db2713f651e85a09456c645 (patch)
tree6d62404d3b64969d04d9f70a4c5c9ed44f2253e9
parent6162c49e40582bf058a6bb82ccc0cfb8f92332b6 (diff)
downloadrails-e77368637e17e6a33db2713f651e85a09456c645.tar.gz
rails-e77368637e17e6a33db2713f651e85a09456c645.tar.bz2
rails-e77368637e17e6a33db2713f651e85a09456c645.zip
Switch the default redis adapter to a single-stream model
This new adapter does get a little more intimate with the redis-rb gem's implementation than I would like, but it's the least bad of the approaches I've come up with.
-rw-r--r--Gemfile1
-rw-r--r--Gemfile.lock1
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/evented_redis.rb67
-rw-r--r--actioncable/lib/action_cable/subscription_adapter/redis.rb156
-rw-r--r--actioncable/test/subscription_adapter/evented_redis_test.rb10
-rw-r--r--actioncable/test/subscription_adapter/redis_test.rb8
-rw-r--r--railties/lib/rails/generators/app_base.rb1
-rw-r--r--railties/test/generators/app_generator_test.rb3
8 files changed, 212 insertions, 35 deletions
diff --git a/Gemfile b/Gemfile
index f8df08ee43..78f9853bed 100644
--- a/Gemfile
+++ b/Gemfile
@@ -65,6 +65,7 @@ group :cable do
gem 'puma', require: false
gem 'em-hiredis', require: false
+ gem 'hiredis', require: false
gem 'redis', require: false
gem 'faye-websocket', require: false
diff --git a/Gemfile.lock b/Gemfile.lock
index d2d616b39d..b15d3498a8 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -286,6 +286,7 @@ DEPENDENCIES
delayed_job_active_record
em-hiredis
faye-websocket
+ hiredis
jquery-rails
json
kindlerb (= 0.1.1)
diff --git a/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
new file mode 100644
index 0000000000..d697548cbd
--- /dev/null
+++ b/actioncable/lib/action_cable/subscription_adapter/evented_redis.rb
@@ -0,0 +1,67 @@
+require 'thread'
+
+gem 'em-hiredis', '~> 0.3.0'
+gem 'redis', '~> 3.0'
+require 'em-hiredis'
+require 'redis'
+
+EventMachine.epoll if EventMachine.epoll?
+EventMachine.kqueue if EventMachine.kqueue?
+
+module ActionCable
+ module SubscriptionAdapter
+ class EventedRedis < Base # :nodoc:
+ @@mutex = Mutex.new
+
+ def initialize(*)
+ super
+ @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ end
+
+ def broadcast(channel, payload)
+ redis_connection_for_broadcasts.publish(channel, payload)
+ end
+
+ def subscribe(channel, message_callback, success_callback = nil)
+ redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
+ result.callback { |reply| success_callback.call } if success_callback
+ end
+ end
+
+ def unsubscribe(channel, message_callback)
+ redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ end
+
+ def shutdown
+ redis_connection_for_subscriptions.pubsub.close_connection
+ @redis_connection_for_subscriptions = nil
+ end
+
+ private
+ def redis_connection_for_subscriptions
+ ensure_reactor_running
+ @redis_connection_for_subscriptions || @server.mutex.synchronize do
+ @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
+ redis.on(:reconnect_failed) do
+ @logger.info "[ActionCable] Redis reconnect failed."
+ end
+ end
+ end
+ end
+
+ def redis_connection_for_broadcasts
+ @redis_connection_for_broadcasts || @server.mutex.synchronize do
+ @redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
+ end
+ end
+
+ def ensure_reactor_running
+ return if EventMachine.reactor_running?
+ @@mutex.synchronize do
+ Thread.new { EventMachine.run } unless EventMachine.reactor_running?
+ Thread.pass until EventMachine.reactor_running?
+ end
+ end
+ end
+ end
+end
diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb
index 560b79df16..7076383efe 100644
--- a/actioncable/lib/action_cable/subscription_adapter/redis.rb
+++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb
@@ -1,52 +1,40 @@
require 'thread'
-gem 'em-hiredis', '~> 0.3.0'
gem 'redis', '~> 3.0'
-require 'em-hiredis'
require 'redis'
-EventMachine.epoll if EventMachine.epoll?
-EventMachine.kqueue if EventMachine.kqueue?
-
module ActionCable
module SubscriptionAdapter
class Redis < Base # :nodoc:
- @@mutex = Mutex.new
-
def initialize(*)
super
- @redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
+ @listener = nil
+ @redis_connection_for_broadcasts = nil
end
def broadcast(channel, payload)
redis_connection_for_broadcasts.publish(channel, payload)
end
- def subscribe(channel, message_callback, success_callback = nil)
- redis_connection_for_subscriptions.pubsub.subscribe(channel, &message_callback).tap do |result|
- result.callback { |reply| success_callback.call } if success_callback
- end
+ def subscribe(channel, callback, success_callback = nil)
+ listener.add_subscriber(channel, callback, success_callback)
end
- def unsubscribe(channel, message_callback)
- redis_connection_for_subscriptions.pubsub.unsubscribe_proc(channel, message_callback)
+ def unsubscribe(channel, callback)
+ listener.remove_subscriber(channel, callback)
end
def shutdown
- redis_connection_for_subscriptions.pubsub.close_connection
- @redis_connection_for_subscriptions = nil
+ @listener.shutdown if @listener
+ end
+
+ def redis_connection_for_subscriptions
+ ::Redis.new(@server.config.cable)
end
private
- def redis_connection_for_subscriptions
- ensure_reactor_running
- @redis_connection_for_subscriptions || @server.mutex.synchronize do
- @redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
- redis.on(:reconnect_failed) do
- @logger.info "[ActionCable] Redis reconnect failed."
- end
- end
- end
+ def listener
+ @listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
end
def redis_connection_for_broadcasts
@@ -55,12 +43,120 @@ module ActionCable
end
end
- def ensure_reactor_running
- return if EventMachine.reactor_running?
- @@mutex.synchronize do
- Thread.new { EventMachine.run } unless EventMachine.reactor_running?
- Thread.pass until EventMachine.reactor_running?
+ class Listener < SubscriberMap
+ def initialize(adapter)
+ super()
+
+ @adapter = adapter
+
+ @subscribe_callbacks = Hash.new { |h, k| h[k] = [] }
+ @subscription_lock = Mutex.new
+
+ @raw_client = nil
+
+ @when_connected = []
+
+ @thread = nil
+ end
+
+ def listen(conn)
+ conn.without_reconnect do
+ original_client = conn.client
+
+ conn.subscribe('_action_cable_internal') do |on|
+ on.subscribe do |chan, count|
+ @subscription_lock.synchronize do
+ if count == 1
+ @raw_client = original_client
+
+ until @when_connected.empty?
+ @when_connected.shift.call
+ end
+ end
+
+ if callbacks = @subscribe_callbacks[chan]
+ next_callback = callbacks.shift
+ Concurrent.global_io_executor << next_callback if next_callback
+ @subscribe_callbacks.delete(chan) if callbacks.empty?
+ end
+ end
+ end
+
+ on.message do |chan, message|
+ broadcast(chan, message)
+ end
+
+ on.unsubscribe do |chan, count|
+ if count == 0
+ @subscription_lock.synchronize do
+ @raw_client = nil
+ end
+ end
+ end
+ end
+ end
+ end
+
+ def shutdown
+ @subscription_lock.synchronize do
+ return if @thread.nil?
+
+ when_connected do
+ send_command('unsubscribe')
+ @raw_client = nil
+ end
+ end
+
+ Thread.pass while @thread.alive?
+ end
+
+ def add_channel(channel, on_success)
+ @subscription_lock.synchronize do
+ ensure_listener_running
+ @subscribe_callbacks[channel] << on_success
+ when_connected { send_command('subscribe', channel) }
+ end
+ end
+
+ def remove_channel(channel)
+ @subscription_lock.synchronize do
+ when_connected { send_command('unsubscribe', channel) }
+ end
+ end
+
+ def invoke_callback(*)
+ Concurrent.global_io_executor.post { super }
end
+
+ private
+ def ensure_listener_running
+ @thread ||= Thread.new do
+ Thread.current.abort_on_exception = true
+
+ conn = @adapter.redis_connection_for_subscriptions
+ listen conn
+ end
+ end
+
+ def when_connected(&block)
+ if @raw_client
+ block.call
+ else
+ @when_connected << block
+ end
+ end
+
+ def send_command(*command)
+ @raw_client.write(command)
+
+ very_raw_connection =
+ @raw_client.connection.instance_variable_defined?(:@connection) &&
+ @raw_client.connection.instance_variable_get(:@connection)
+
+ if very_raw_connection && very_raw_connection.respond_to?(:flush)
+ very_raw_connection.flush
+ end
+ end
end
end
end
diff --git a/actioncable/test/subscription_adapter/evented_redis_test.rb b/actioncable/test/subscription_adapter/evented_redis_test.rb
new file mode 100644
index 0000000000..70333e51bd
--- /dev/null
+++ b/actioncable/test/subscription_adapter/evented_redis_test.rb
@@ -0,0 +1,10 @@
+require 'test_helper'
+require_relative './common'
+
+class EventedRedisAdapterTest < ActionCable::TestCase
+ include CommonSubscriptionAdapterTest
+
+ def cable_config
+ { adapter: 'evented_redis', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
diff --git a/actioncable/test/subscription_adapter/redis_test.rb b/actioncable/test/subscription_adapter/redis_test.rb
index 8d52832c87..4f34dd86c9 100644
--- a/actioncable/test/subscription_adapter/redis_test.rb
+++ b/actioncable/test/subscription_adapter/redis_test.rb
@@ -5,6 +5,12 @@ class RedisAdapterTest < ActionCable::TestCase
include CommonSubscriptionAdapterTest
def cable_config
- { adapter: 'redis', url: 'redis://127.0.0.1:6379/12' }
+ { adapter: 'redis', driver: 'ruby', url: 'redis://127.0.0.1:6379/12' }
+ end
+end
+
+class RedisAdapterTest::Hiredis < RedisAdapterTest
+ def cable_config
+ super.merge(driver: 'hiredis')
end
end
diff --git a/railties/lib/rails/generators/app_base.rb b/railties/lib/rails/generators/app_base.rb
index c629459d95..51394de824 100644
--- a/railties/lib/rails/generators/app_base.rb
+++ b/railties/lib/rails/generators/app_base.rb
@@ -344,7 +344,6 @@ module Rails
return [] if options[:skip_action_cable]
comment = 'Action Cable dependencies for the Redis adapter'
gems = []
- gems << GemfileEntry.new("em-hiredis", '~> 0.3.0', comment)
gems << GemfileEntry.new("redis", '~> 3.0', comment)
gems
end
diff --git a/railties/test/generators/app_generator_test.rb b/railties/test/generators/app_generator_test.rb
index 53f8ed6af0..49536c068f 100644
--- a/railties/test/generators/app_generator_test.rb
+++ b/railties/test/generators/app_generator_test.rb
@@ -400,7 +400,6 @@ class AppGeneratorTest < Rails::Generators::TestCase
assert_no_match(/action_cable_meta_tag/, content)
end
assert_file "Gemfile" do |content|
- assert_no_match(/em-hiredis/, content)
assert_no_match(/redis/, content)
end
end
@@ -412,14 +411,12 @@ class AppGeneratorTest < Rails::Generators::TestCase
assert_no_file "app/assets/javascripts/cable.coffee"
assert_no_file "app/channels"
assert_file "Gemfile" do |content|
- assert_no_match(/em-hiredis/, content)
assert_no_match(/redis/, content)
end
end
def test_action_cable_redis_gems
run_generator
- assert_gem 'em-hiredis'
assert_gem 'redis'
end