From ee51b51b60f9e6cce9babed2c8a65a14d87790c8 Mon Sep 17 00:00:00 2001 From: Brian Durand Date: Wed, 21 Apr 2010 23:22:05 -0500 Subject: ActiveSupport::Cache refactoring All Caches * Add default options to initializer that will be sent to all read, write, fetch, exist?, increment, and decrement * Add support for the :expires_in option to fetch and write for all caches. Cache entries are stored with the create timestamp and a ttl so that expiration can be handled independently of the implementation. * Add support for a :namespace option. This can be used to set a global prefix for cache entries. * Deprecate expand_cache_key on ActiveSupport::Cache and move it to ActionController::Caching and ActionDispatch::Http::Cache since the logic in the method used some Rails specific environment variables and was only used by ActionPack classes. Not very DRY but there didn't seem to be a good shared spot and ActiveSupport really shouldn't be Rails specific. * Add support for :race_condition_ttl to fetch. This setting can prevent race conditions on fetch calls where several processes try to regenerate a recently expired entry at once. * Add support for :compress option to fetch and write which will compress any data over a configurable threshold. * Nil values can now be stored in the cache and are distinct from cache misses for fetch. * Easier API to create new implementations. Just need to implement the methods read_entry, write_entry, and delete_entry instead of overwriting existing methods. * Since all cache implementations support storing objects, update the docs to state that ActiveCache::Cache::Store implementations should store objects. Keys, however, must be strings since some implementations require that. * Increase test coverage. * Document methods which are provided as convenience but which may not be universally available. MemoryStore * MemoryStore can now safely be used as the cache for single server sites. * Make thread safe so that the default cache implementation used by Rails is thread safe. The overhead is minimal and it is still the fastest store available. * Provide :size initialization option indicating the maximum size of the cache in memory (defaults to 32Mb). * Add prune logic that removes the least recently used cache entries to keep the cache size from exceeding the max. * Deprecated SynchronizedMemoryStore since it isn't needed anymore. FileStore * Escape key values so they will work as file names on all file systems, be consistent, and case sensitive * Use a hash algorithm to segment the cache into sub directories so that a large cache doesn't exceed file system limits. * FileStore can be slow so implement the LocalCache strategy to cache reads for the duration of a request. * Add cleanup method to keep the disk from filling up with expired entries. * Fix increment and decrement to use file system locks so they are consistent between processes. MemCacheStore * Support all keys. Previously keys with spaces in them would fail * Deprecate CompressedMemCacheStore since it isn't needed anymore (use :compress => true) [#4452 state:committed] Signed-off-by: Jeremy Kemper --- activesupport/lib/active_support/cache.rb | 496 +++++++++++++++++++++++++----- 1 file changed, 427 insertions(+), 69 deletions(-) (limited to 'activesupport/lib/active_support/cache.rb') diff --git a/activesupport/lib/active_support/cache.rb b/activesupport/lib/active_support/cache.rb index 7213b24f2d..ec5007c284 100644 --- a/activesupport/lib/active_support/cache.rb +++ b/activesupport/lib/active_support/cache.rb @@ -1,8 +1,12 @@ require 'benchmark' +require 'zlib' +require 'active_support/core_ext/array/extract_options' require 'active_support/core_ext/array/wrap' require 'active_support/core_ext/benchmark' require 'active_support/core_ext/exception' require 'active_support/core_ext/class/attribute_accessors' +require 'active_support/core_ext/numeric/bytes' +require 'active_support/core_ext/numeric/time' require 'active_support/core_ext/object/to_param' require 'active_support/core_ext/string/inflections' @@ -11,10 +15,16 @@ module ActiveSupport module Cache autoload :FileStore, 'active_support/cache/file_store' autoload :MemoryStore, 'active_support/cache/memory_store' - autoload :SynchronizedMemoryStore, 'active_support/cache/synchronized_memory_store' autoload :MemCacheStore, 'active_support/cache/mem_cache_store' + autoload :SynchronizedMemoryStore, 'active_support/cache/synchronized_memory_store' autoload :CompressedMemCacheStore, 'active_support/cache/compressed_mem_cache_store' + EMPTY_OPTIONS = {}.freeze + + # These options mean something to all cache implementations. Individual cache + # implementations may support additional optons. + UNIVERSAL_OPTIONS = [:namespace, :compress, :compress_threshold, :expires_in, :race_condition_ttl] + module Strategy autoload :LocalCache, 'active_support/cache/strategy/local_cache' end @@ -59,15 +69,12 @@ module ActiveSupport end end - RAILS_CACHE_ID = ENV["RAILS_CACHE_ID"] - RAILS_APP_VERION = ENV["RAILS_APP_VERION"] - EXPANDED_CACHE = RAILS_CACHE_ID || RAILS_APP_VERION - def self.expand_cache_key(key, namespace = nil) expanded_cache_key = namespace ? "#{namespace}/" : "" - if EXPANDED_CACHE - expanded_cache_key << "#{RAILS_CACHE_ID || RAILS_APP_VERION}/" + prefix = ENV["RAILS_CACHE_ID"] || ENV["RAILS_APP_VERSION"] + if prefix + expanded_cache_key << "#{prefix}/" end expanded_cache_key << @@ -92,26 +99,75 @@ module ActiveSupport # ActiveSupport::Cache::MemCacheStore. MemCacheStore is currently the most # popular cache store for large production websites. # - # ActiveSupport::Cache::Store is meant for caching strings. Some cache - # store implementations, like MemoryStore, are able to cache arbitrary - # Ruby objects, but don't count on every cache store to be able to do that. + # Some implementations may not support all methods beyond the basic cache + # methods of +fetch+, +write+, +read+, +exist?+, and +delete+. + # + # ActiveSupport::Cache::Store can store any serializable Ruby object. # # cache = ActiveSupport::Cache::MemoryStore.new # # cache.read("city") # => nil # cache.write("city", "Duckburgh") # cache.read("city") # => "Duckburgh" + # + # Keys are always translated into Strings and are case sensitive. When an + # object is specified as a key, its +cache_key+ method will be called if it + # is defined. Otherwise, the +to_param+ method will be called. Hashes and + # Arrays can be used as keys. The elements will be delimited by slashes + # and Hashes elements will be sorted by key so they are consistent. + # + # cache.read("city") == cache.read(:city) # => true + # + # Nil values can be cached. + # + # If your cache is on a shared infrastructure, you can define a namespace for + # your cache entries. If a namespace is defined, it will be prefixed on to every + # key. The namespace can be either a static value or a Proc. If it is a Proc, it + # will be invoked when each key is evaluated so that you can use application logic + # to invalidate keys. + # + # cache.namespace = lambda { @last_mod_time } # Set the namespace to a variable + # @last_mod_time = Time.now # Invalidate the entire cache by changing namespace + # + # All caches support auto expiring content after a specified number of seconds. + # To set the cache entry time to live, you can either specify +:expires_in+ as + # an option to the constructor to have it affect all entries or to the +fetch+ + # or +write+ methods for just one entry. + # + # cache = ActiveSupport::Cache::MemoryStore.new(:expire_in => 5.minutes) + # cache.write(key, value, :expire_in => 1.minute) # Set a lower value for one entry + # + # Caches can also store values in a compressed format to save space and reduce + # time spent sending data. Since there is some overhead, values must be large + # enough to warrant compression. To turn on compression either pass + # :compress => true in the initializer or to +fetch+ or +write+. + # To specify the threshold at which to compress values, set + # :compress_threshold. The default threshold is 32K. class Store - cattr_accessor :logger, :instance_writter => false + + cattr_accessor :logger, :instance_writer => true attr_reader :silence alias :silence? :silence + # Create a new cache. The options will be passed to any write method calls except + # for :namespace which can be used to set the global namespace for the cache. + def initialize (options = nil) + @options = options ? options.dup : {} + end + + # Get the default options set when the cache was created. + def options + @options ||= {} + end + + # Silence the logger. def silence! @silence = true self end + # Silence the logger within a block. def mute previous_silence, @silence = defined?(@silence) && @silence, true yield @@ -152,28 +208,85 @@ module ActiveSupport # cache.write("today", "Monday") # cache.fetch("today", :force => true) # => nil # + # Setting :compress will store a large cache entry set by the call + # in a compressed format. + # + # Setting :expires_in will set an expiration time on the cache + # entry if it is set by call. + # + # Setting :race_condition_ttl will invoke logic on entries set with + # an :expires_in option. If an entry is found in the cache that is + # expired and it has been expired for less than the number of seconds specified + # by this option and a block was passed to the method call, then the expiration + # future time of the entry in the cache will be updated to that many seconds + # in the and the block will be evaluated and written to the cache. + # + # This is very useful in situations where a cache entry is used very frequently + # under heavy load. The first process to find an expired cache entry will then + # become responsible for regenerating that entry while other processes continue + # to use the slightly out of date entry. This can prevent race conditions where + # too many processes are trying to regenerate the entry all at once. If the + # process regenerating the entry errors out, the entry will be regenerated + # after the specified number of seconds. + # + # # Set all values to expire after one minute. + # cache = ActiveSupport::Cache::MemoryCache.new(:expires_in => 1.minute) + # + # cache.write("foo", "original value") + # val_1 = nil + # val_2 = nil + # sleep 60 + # + # Thread.new do + # val_1 = cache.fetch("foo", :race_condition_ttl => 10) do + # sleep 1 + # "new value 1" + # end + # end + # + # Thread.new do + # val_2 = cache.fetch("foo", :race_condition_ttl => 10) do + # "new value 2" + # end + # end + # + # # val_1 => "new value 1" + # # val_2 => "original value" + # # cache.fetch("foo") => "new value 1" + # # Other options will be handled by the specific cache store implementation. - # Internally, #fetch calls #read, and calls #write on a cache miss. + # Internally, #fetch calls #read_entry, and calls #write_entry on a cache miss. # +options+ will be passed to the #read and #write calls. # - # For example, MemCacheStore's #write method supports the +:expires_in+ - # option, which tells the memcached server to automatically expire the - # cache item after a certain period. This options is also supported by - # FileStore's #read method. We can use this option with #fetch too: + # For example, MemCacheStore's #write method supports the +:raw+ + # option, which tells the memcached server to store all values as strings. + # We can use this option with #fetch too: # # cache = ActiveSupport::Cache::MemCacheStore.new - # cache.fetch("foo", :force => true, :expires_in => 5.seconds) do - # "bar" + # cache.fetch("foo", :force => true, :raw => true) do + # :bar # end # cache.fetch("foo") # => "bar" - # sleep(6) - # cache.fetch("foo") # => nil - def fetch(key, options = {}, &block) - if !options[:force] && value = read(key, options) - value + def fetch(name, options = nil, &block) + options = merged_options(options) + key = namespaced_key(name, options) + entry = instrument(:read, name, options) { read_entry(key, options) } unless options[:force] + if entry && entry.expired? + race_ttl = options[:race_condition_ttl].to_f + if race_ttl and Time.now.to_f - entry.expires_at <= race_ttl + entry.expires_at = Time.now + race_ttl + write_entry(key, entry, :expires_in => race_ttl * 2) + else + delete_entry(key, options) + end + entry = nil + end + + if entry + entry.value elsif block_given? - result = instrument(:generate, key, options, &block) - write(key, result, options) + result = instrument(:generate, name, options, &block) + write(name, result, options) result end end @@ -182,15 +295,47 @@ module ActiveSupport # the cache with the given key, then that data is returned. Otherwise, # nil is returned. # - # You may also specify additional options via the +options+ argument. - # The specific cache store implementation will decide what to do with - # +options+. + # Options are passed to the underlying cache implementation. + def read(name, options = nil) + options = merged_options(options) + key = namespaced_key(name, options) + instrument(:read, name, options) do + entry = read_entry(key, options) + if entry + if entry.expired? + delete_entry(key, options) + nil + else + entry.value + end + else + nil + end + end + end + + # Read multiple values at once from the cache. Options can be passed + # in the last argument. # - # For example, FileStore supports the +:expires_in+ option, which - # makes the method return nil for cache items older than the specified - # period. - def read(key, options = nil, &block) - instrument(:read, key, options, &block) + # Some cache implementation may optimize this method. + # + # Returns a hash mapping the names provided to the values found. + def read_multi(*names) + options = names.extract_options! + options = merged_options(options) + results = {} + names.each do |name| + key = namespaced_key(name, options) + entry = read_entry(key, options) + if entry + if entry.expired? + delete_entry(key) + else + results[name] = entry.value + end + end + end + results end # Writes the given value to the cache, with the given key. @@ -198,56 +343,160 @@ module ActiveSupport # You may also specify additional options via the +options+ argument. # The specific cache store implementation will decide what to do with # +options+. + def write(name, value, options = nil) + options = merged_options(options) + instrument(:write, name, options) do + entry = Entry.new(value, options) + write_entry(namespaced_key(name, options), entry, options) + end + end + + # Delete an entry in the cache. Returns +true+ if there was an entry to delete. # - # For example, MemCacheStore supports the +:expires_in+ option, which - # tells the memcached server to automatically expire the cache item after - # a certain period: + # Options are passed to the underlying cache implementation. + def delete(name, options = nil) + options = merged_options(options) + instrument(:delete, name) do + delete_entry(namespaced_key(name, options), options) + end + end + + # Return true if the cache contains an entry with this name. # - # cache = ActiveSupport::Cache::MemCacheStore.new - # cache.write("foo", "bar", :expires_in => 5.seconds) - # cache.read("foo") # => "bar" - # sleep(6) - # cache.read("foo") # => nil - def write(key, value, options = nil, &block) - instrument(:write, key, options, &block) + # Options are passed to the underlying cache implementation. + def exist?(name, options = nil) + options = merged_options(options) + instrument(:exist?, name) do + entry = read_entry(namespaced_key(name, options), options) + if entry && !entry.expired? + true + else + false + end + end end - def delete(key, options = nil, &block) - instrument(:delete, key, options, &block) + # Delete all entries whose keys match a pattern. + # + # Options are passed to the underlying cache implementation. + # + # Not all implementations may support +delete_matched+. + def delete_matched(matcher, options = nil) + raise NotImplementedError.new("#{self.class.name} does not support delete_matched") end - def delete_matched(matcher, options = nil, &block) - instrument(:delete_matched, matcher.inspect, options, &block) + # Increment an integer value in the cache. + # + # Options are passed to the underlying cache implementation. + # + # Not all implementations may support +delete_matched+. + def increment(name, amount = 1, options = nil) + raise NotImplementedError.new("#{self.class.name} does not support increment") end - def exist?(key, options = nil, &block) - instrument(:exist?, key, options, &block) + # Increment an integer value in the cache. + # + # Options are passed to the underlying cache implementation. + # + # Not all implementations may support +delete_matched+. + def decrement(name, amount = 1, options = nil) + raise NotImplementedError.new("#{self.class.name} does not support decrement") end - def increment(key, amount = 1) - if num = read(key) - write(key, num + amount) - else - nil - end + # Cleanup the cache by removing expired entries. Not all cache implementations may + # support this method. + # + # Options are passed to the underlying cache implementation. + # + # Not all implementations may support +delete_matched+. + def cleanup(options = nil) + raise NotImplementedError.new("#{self.class.name} does not support cleanup") end - def decrement(key, amount = 1) - if num = read(key) - write(key, num - amount) - else - nil - end + # Clear the entire cache. Not all cache implementations may support this method. + # You should be careful with this method since it could affect other processes + # if you are using a shared cache. + # + # Options are passed to the underlying cache implementation. + # + # Not all implementations may support +delete_matched+. + def clear(options = nil) + raise NotImplementedError.new("#{self.class.name} does not support clear") end + protected + # Add the namespace defined in the options to a pattern designed to match keys. + # Implementations that support delete_matched should call this method to translate + # a pattern that matches names into one that matches namespaced keys. + def key_matcher(pattern, options) + prefix = options[:namespace].is_a?(Proc) ? options[:namespace].call : options[:namespace] + if prefix + source = pattern.source + if source.start_with?('^') + source = source[1, source.length] + else + source = ".*#{source[0, source.length]}" + end + Regexp.new("^#{Regexp.escape(prefix)}:#{source}", pattern.options) + else + pattern + end + end + + # Read an entry from the cache implementation. Subclasses must implement this method. + def read_entry(key, options) # :nodoc: + raise NotImplementedError.new + end + + # Write an entry to the cache implementation. Subclasses must implement this method. + def write_entry(key, entry, options) # :nodoc: + raise NotImplementedError.new + end + + # Delete an entry from the cache implementation. Subclasses must implement this method. + def delete_entry(key, options) # :nodoc: + raise NotImplementedError.new + end + private - def expires_in(options) - expires_in = options && options[:expires_in] - raise ":expires_in must be a number" if expires_in && !expires_in.is_a?(Numeric) - expires_in || 0 + # Merge the default options with ones specific to a method call. + def merged_options(call_options) # :nodoc: + if call_options + options.merge(call_options) + else + options.dup + end + end + + # Expand a key to be a consistent string value. If the object responds to +cache_key+, + # it will be called. Otherwise, the to_param method will be called. If the key is a + # Hash, the keys will be sorted alphabetically. + def expanded_key(key) # :nodoc: + if key.respond_to?(:cache_key) + key = key.cache_key.to_s + elsif key.is_a?(Array) + if key.size > 1 + key.collect{|element| expanded_key(element)}.to_param + else + key.first.to_param + end + elsif key.is_a?(Hash) + key = key.to_a.sort{|a,b| a.first.to_s <=> b.first.to_s}.collect{|k,v| "#{k}=#{v}"}.to_param + else + key = key.to_param + end + end + + # Prefix a key with the namespace. The two values will be delimited with a colon. + def namespaced_key(key, options) + key = expanded_key(key) + namespace = options[:namespace] if options + prefix = namespace.is_a?(Proc) ? namespace.call : namespace + key = "#{prefix}:#{key}" if prefix + key end - def instrument(operation, key, options) + def instrument(operation, key, options = nil) log(operation, key, options) if self.class.instrument @@ -259,9 +508,118 @@ module ActiveSupport end end - def log(operation, key, options) - return unless logger && !silence? - logger.debug("Cache #{operation}: #{key}#{options ? " (#{options.inspect})" : ""}") + def log(operation, key, options = nil) + return unless logger && logger.debug? && !silence? + logger.debug("Cache #{operation}: #{key}#{options.blank? ? "" : " (#{options.inspect})"}") + end + end + + # Entry that is put into caches. It supports expiration time on entries and can compress values + # to save space in the cache. + class Entry + attr_reader :created_at, :expires_in + + DEFAULT_COMPRESS_LIMIT = 16.kilobytes + + class << self + # Create an entry with internal attributes set. This method is intended to be + # used by implementations that store cache entries in a native format instead + # of as serialized Ruby objects. + def create (raw_value, created_at, options = {}) + entry = new(nil) + entry.instance_variable_set(:@value, raw_value) + entry.instance_variable_set(:@created_at, created_at.to_f) + entry.instance_variable_set(:@compressed, !!options[:compressed]) + entry.instance_variable_set(:@expires_in, options[:expires_in]) + entry + end + end + + # Create a new cache entry for the specified value. Options supported are + # +:compress+, +:compress_threshold+, and +:expires_in+. + def initialize(value, options = {}) + @compressed = false + @expires_in = options[:expires_in] + @expires_in = @expires_in.to_f if @expires_in + @created_at = Time.now.to_f + if value + if should_compress?(value, options) + @value = Zlib::Deflate.deflate(Marshal.dump(value)) + @compressed = true + else + @value = value + end + else + @value = nil + end + end + + # Get the raw value. This value may be serialized and compressed. + def raw_value + @value + end + + # Get the value stored in the cache. + def value + if @value + val = compressed? ? Marshal.load(Zlib::Inflate.inflate(@value)) : @value + unless val.frozen? + val.freeze rescue nil + end + val + end + end + + def compressed? + @compressed + end + + # Check if the entry is expired. The +expires_in+ parameter can override the + # value set when the entry was created. + def expired? + if @expires_in && @created_at + @expires_in <= Time.now.to_f + true + else + false + end + end + + # Set a new time to live on the entry so it expires at the given time. + def expires_at=(time) + if time + @expires_in = time.to_f - @created_at + else + @expires_in = nil + end + end + + # Seconds since the epoch when the cache entry will expire. + def expires_at + @expires_in ? @created_at + @expires_in : nil + end + + # Get the size of the cached value. This could be less than value.size + # if the data is compressed. + def size + if @value.nil? + 0 + elsif @value.respond_to?(:bytesize) + @value.bytesize + else + Marshal.dump(@value).bytesize + end + end + + private + def should_compress?(value, options) + if options[:compress] && value + unless value.is_a?(Numeric) + compress_threshold = options[:compress_threshold] || DEFAULT_COMPRESS_LIMIT + serialized_value = value.is_a?(String) ? value : Marshal.dump(value) + return true if serialized_value.size >= compress_threshold + end + end + false end end end -- cgit v1.2.3