From 4efbbc844b3cd5f76e0e24ae1a1f90bb57da3b18 Mon Sep 17 00:00:00 2001 From: fatkodima Date: Thu, 14 Dec 2017 17:05:13 +0200 Subject: Add support for connection pooling on RedisCacheStore --- activesupport/CHANGELOG.md | 4 ++ activesupport/lib/active_support/cache.rb | 17 ++++++ .../lib/active_support/cache/mem_cache_store.rb | 13 +---- .../lib/active_support/cache/redis_cache_store.rb | 64 ++++++++++++++++++---- activesupport/test/cache/behaviors.rb | 1 + .../cache/behaviors/connection_pool_behavior.rb | 53 ++++++++++++++++++ .../test/cache/stores/mem_cache_store_test.rb | 55 ++----------------- .../test/cache/stores/redis_cache_store_test.rb | 32 +++++++++++ 8 files changed, 166 insertions(+), 73 deletions(-) create mode 100644 activesupport/test/cache/behaviors/connection_pool_behavior.rb (limited to 'activesupport') diff --git a/activesupport/CHANGELOG.md b/activesupport/CHANGELOG.md index acff6367f2..29d6119113 100644 --- a/activesupport/CHANGELOG.md +++ b/activesupport/CHANGELOG.md @@ -1,3 +1,7 @@ +* Add support for connection pooling on RedisCacheStore. + + *fatkodima* + * Support hash as first argument in `assert_difference`. This allows to specify multiple numeric differences in the same assertion. diff --git a/activesupport/lib/active_support/cache.rb b/activesupport/lib/active_support/cache.rb index 2d038dba77..d221b36365 100644 --- a/activesupport/lib/active_support/cache.rb +++ b/activesupport/lib/active_support/cache.rb @@ -160,6 +160,23 @@ module ActiveSupport attr_reader :silence, :options alias :silence? :silence + class << self + private + def retrieve_pool_options(options) + {}.tap do |pool_options| + pool_options[:size] = options[:pool_size] if options[:pool_size] + pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout] + end + end + + def ensure_connection_pool_added! + require "connection_pool" + rescue LoadError => e + $stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install" + raise e + end + end + # Creates a new cache. The options will be passed to any write method calls # except for :namespace which can be used to set the global # namespace for the cache. diff --git a/activesupport/lib/active_support/cache/mem_cache_store.rb b/activesupport/lib/active_support/cache/mem_cache_store.rb index cae0d44e7d..2840781dde 100644 --- a/activesupport/lib/active_support/cache/mem_cache_store.rb +++ b/activesupport/lib/active_support/cache/mem_cache_store.rb @@ -63,21 +63,12 @@ module ActiveSupport addresses = addresses.flatten options = addresses.extract_options! addresses = ["localhost:11211"] if addresses.empty? - - pool_options = {} - pool_options[:size] = options[:pool_size] if options[:pool_size] - pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout] + pool_options = retrieve_pool_options(options) if pool_options.empty? Dalli::Client.new(addresses, options) else - begin - require "connection_pool" - rescue LoadError => e - $stderr.puts "You don't have connection_pool installed in your application. Please add it to your Gemfile and run bundle install" - raise e - end - + ensure_connection_pool_added! ConnectionPool.new(pool_options) { Dalli::Client.new(addresses, options.merge(threadsafe: false)) } end end diff --git a/activesupport/lib/active_support/cache/redis_cache_store.rb b/activesupport/lib/active_support/cache/redis_cache_store.rb index 0368423dad..3347576651 100644 --- a/activesupport/lib/active_support/cache/redis_cache_store.rb +++ b/activesupport/lib/active_support/cache/redis_cache_store.rb @@ -20,6 +20,31 @@ require "active_support/core_ext/marshal" module ActiveSupport module Cache + module ConnectionPoolLike + def with + yield self + end + end + + ::Redis.include(ConnectionPoolLike) + + class RedisDistributedWithConnectionPool < ::Redis::Distributed + def add_node(options) + pool_options = {} + pool_options[:size] = options[:pool_size] if options[:pool_size] + pool_options[:timeout] = options[:pool_timeout] if options[:pool_timeout] + + if pool_options.empty? + super + else + options = { url: options } if options.is_a?(String) + options = @default_options.merge(options) + pool = ConnectionPool.new(pool_options) { ::Redis.new(options) } + @ring.add_node(pool) + end + end + end + # Redis cache store. # # Deployment note: Take care to use a *dedicated Redis cache* rather @@ -122,7 +147,7 @@ module ActiveSupport private def build_redis_distributed_client(urls:, **redis_options) - ::Redis::Distributed.new([], DEFAULT_REDIS_OPTIONS.merge(redis_options)).tap do |dist| + RedisDistributedWithConnectionPool.new([], DEFAULT_REDIS_OPTIONS.merge(redis_options)).tap do |dist| urls.each { |u| dist.add_node url: u } end end @@ -172,7 +197,7 @@ module ActiveSupport end def redis - @redis ||= self.class.build_redis(**redis_options) + @redis ||= wrap_in_connection_pool(self.class.build_redis(**redis_options)) end def inspect @@ -211,7 +236,7 @@ module ActiveSupport instrument :delete_matched, matcher do case matcher when String - redis.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)] + redis.with { |c| c.eval DELETE_GLOB_LUA, [], [namespace_key(matcher, options)] } else raise ArgumentError, "Only Redis glob strings are supported: #{matcher.inspect}" end @@ -228,7 +253,7 @@ module ActiveSupport # Failsafe: Raises errors. def increment(name, amount = 1, options = nil) instrument :increment, name, amount: amount do - redis.incrby normalize_key(name, options), amount + redis.with { |c| c.incrby normalize_key(name, options), amount } end end @@ -242,7 +267,7 @@ module ActiveSupport # Failsafe: Raises errors. def decrement(name, amount = 1, options = nil) instrument :decrement, name, amount: amount do - redis.decrby normalize_key(name, options), amount + redis.with { |c| c.decrby normalize_key(name, options), amount } end end @@ -263,7 +288,7 @@ module ActiveSupport if namespace = merged_options(options)[namespace] delete_matched "*", namespace: namespace else - redis.flushdb + redis.with { |c| c.flushdb } end end end @@ -279,6 +304,21 @@ module ActiveSupport end private + def wrap_in_connection_pool(redis_connection) + if redis_connection.is_a?(::Redis) + pool_options = self.class.send(:retrieve_pool_options, redis_options) + + if pool_options.empty? + redis_connection + else + self.class.send(:ensure_connection_pool_added!) + ConnectionPool.new(pool_options) { redis_connection } + end + else + redis_connection + end + end + def set_redis_capabilities case redis when Redis::Distributed @@ -294,7 +334,7 @@ module ActiveSupport # Read an entry from the cache. def read_entry(key, options = nil) failsafe :read_entry do - deserialize_entry redis.get(key) + deserialize_entry redis.with { |c| c.get(key) } end end @@ -303,7 +343,7 @@ module ActiveSupport options = merged_options(options) keys = names.map { |name| normalize_key(name, options) } - values = redis.mget(*keys) + values = redis.with { |c| c.mget(*keys) } names.zip(values).each_with_object({}) do |(name, value), results| if value @@ -334,9 +374,9 @@ module ActiveSupport modifiers[:nx] = unless_exist modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in - redis.set key, value, modifiers + redis.with { |c| c.set key, value, modifiers } else - redis.set key, value + redis.with { |c| c.set key, value } end end end @@ -344,7 +384,7 @@ module ActiveSupport # Delete an entry from the cache. def delete_entry(key, options) failsafe :delete_entry, returning: false do - redis.del key + redis.with { |c| c.del key } end end @@ -353,7 +393,7 @@ module ActiveSupport if entries.any? if mset_capable? && expires_in.nil? failsafe :write_multi_entries do - redis.mapped_mset(entries) + redis.with { |c| c.mapped_mset(entries) } end else super diff --git a/activesupport/test/cache/behaviors.rb b/activesupport/test/cache/behaviors.rb index cb08a10bba..0e07e2319d 100644 --- a/activesupport/test/cache/behaviors.rb +++ b/activesupport/test/cache/behaviors.rb @@ -5,5 +5,6 @@ require_relative "behaviors/cache_delete_matched_behavior" require_relative "behaviors/cache_increment_decrement_behavior" require_relative "behaviors/cache_store_behavior" require_relative "behaviors/cache_store_version_behavior" +require_relative "behaviors/connection_pool_behavior" require_relative "behaviors/encoded_key_cache_behavior" require_relative "behaviors/local_cache_behavior" diff --git a/activesupport/test/cache/behaviors/connection_pool_behavior.rb b/activesupport/test/cache/behaviors/connection_pool_behavior.rb new file mode 100644 index 0000000000..500d51a134 --- /dev/null +++ b/activesupport/test/cache/behaviors/connection_pool_behavior.rb @@ -0,0 +1,53 @@ +# frozen_string_literal: true + +module ConnectionPoolBehavior + def test_connection_pool + emulating_latency do + begin + cache = ActiveSupport::Cache.lookup_store(store, pool_size: 2, pool_timeout: 1) + cache.clear + + threads = [] + + assert_raises Timeout::Error do + # One of the three threads will fail in 1 second because our pool size + # is only two. + 3.times do + threads << Thread.new do + cache.read("latency") + end + end + + threads.each(&:join) + end + ensure + threads.each(&:kill) + end + end + end + + def test_no_connection_pool + emulating_latency do + begin + cache = ActiveSupport::Cache.lookup_store(store) + cache.clear + + threads = [] + + assert_nothing_raised do + # Default connection pool size is 5, assuming 10 will make sure that + # the connection pool isn't used at all. + 10.times do + threads << Thread.new do + cache.read("latency") + end + end + + threads.each(&:join) + end + ensure + threads.each(&:kill) + end + end + end +end diff --git a/activesupport/test/cache/stores/mem_cache_store_test.rb b/activesupport/test/cache/stores/mem_cache_store_test.rb index 7f537c3bbf..ccb3b7403f 100644 --- a/activesupport/test/cache/stores/mem_cache_store_test.rb +++ b/activesupport/test/cache/stores/mem_cache_store_test.rb @@ -45,56 +45,7 @@ class MemCacheStoreTest < ActiveSupport::TestCase include CacheIncrementDecrementBehavior include EncodedKeyCacheBehavior include AutoloadingCacheBehavior - - def test_connection_pool - emulating_latency do - begin - cache = ActiveSupport::Cache.lookup_store(:mem_cache_store, pool_size: 2, pool_timeout: 1) - cache.clear - - threads = [] - - assert_raises Timeout::Error do - # One of the three threads will fail in 1 second because our pool size - # is only two. - 3.times do - threads << Thread.new do - cache.read("latency") - end - end - - threads.each(&:join) - end - ensure - threads.each(&:kill) - end - end - end - - def test_no_connection_pool - emulating_latency do - begin - cache = ActiveSupport::Cache.lookup_store(:mem_cache_store) - cache.clear - - threads = [] - - assert_nothing_raised do - # Default connection pool size is 5, assuming 10 will make sure that - # the connection pool isn't used at all. - 10.times do - threads << Thread.new do - cache.read("latency") - end - end - - threads.each(&:join) - end - ensure - threads.each(&:kill) - end - end - end + include ConnectionPoolBehavior def test_raw_values cache = ActiveSupport::Cache.lookup_store(:mem_cache_store, raw: true) @@ -154,6 +105,10 @@ class MemCacheStoreTest < ActiveSupport::TestCase private + def store + :mem_cache_store + end + def emulating_latency old_client = Dalli.send(:remove_const, :Client) Dalli.const_set(:Client, SlowDalliClient) diff --git a/activesupport/test/cache/stores/redis_cache_store_test.rb b/activesupport/test/cache/stores/redis_cache_store_test.rb index 3635703002..cdbb5dc4c6 100644 --- a/activesupport/test/cache/stores/redis_cache_store_test.rb +++ b/activesupport/test/cache/stores/redis_cache_store_test.rb @@ -8,6 +8,18 @@ require_relative "../behaviors" module ActiveSupport::Cache::RedisCacheStoreTests DRIVER = %w[ ruby hiredis ].include?(ENV["REDIS_DRIVER"]) ? ENV["REDIS_DRIVER"] : "hiredis" + # Emulates a latency on Redis's back-end for the key latency to facilitate + # connection pool testing. + class SlowRedis < Redis + def get(key, options = {}) + if key =~ /latency/ + sleep 3 + else + super + end + end + end + class LookupTest < ActiveSupport::TestCase test "may be looked up as :redis_cache_store" do assert_kind_of ActiveSupport::Cache::RedisCacheStore, @@ -110,6 +122,26 @@ module ActiveSupport::Cache::RedisCacheStoreTests include AutoloadingCacheBehavior end + class RedisCacheStoreConnectionPoolBehaviourTest < StoreTest + include ConnectionPoolBehavior + + private + + def store + :redis_cache_store + end + + def emulating_latency + old_redis = Object.send(:remove_const, :Redis) + Object.const_set(:Redis, SlowRedis) + + yield + ensure + Object.send(:remove_const, :Redis) + Object.const_set(:Redis, old_redis) + end + end + # Separate test class so we can omit the namespace which causes expected, # appropriate complaints about incompatible string encodings. class KeyEncodingSafetyTest < StoreTest -- cgit v1.2.3