diff options
author | Joshua Peek <josh@joshpeek.com> | 2009-09-13 23:30:32 -0500 |
---|---|---|
committer | Joshua Peek <josh@joshpeek.com> | 2009-09-13 23:30:32 -0500 |
commit | a42c8958f99d9cc5f8be844098dfb762f72dba7a (patch) | |
tree | e7bb4f659ceeab2e7ae1f5dfb5c9db23d8eda090 | |
parent | b2f0b8cbda74cc89834b2db749fb0fbe44f5d8f2 (diff) | |
download | rails-a42c8958f99d9cc5f8be844098dfb762f72dba7a.tar.gz rails-a42c8958f99d9cc5f8be844098dfb762f72dba7a.tar.bz2 rails-a42c8958f99d9cc5f8be844098dfb762f72dba7a.zip |
Bump vendored memcache to 1.7.5
-rw-r--r-- | activesupport/lib/active_support/vendor/memcache-client-1.7.5/lib/memcache.rb (renamed from activesupport/lib/active_support/vendor/memcache-client-1.6.5/lib/memcache.rb) | 385 |
1 files changed, 292 insertions, 93 deletions
diff --git a/activesupport/lib/active_support/vendor/memcache-client-1.6.5/lib/memcache.rb b/activesupport/lib/active_support/vendor/memcache-client-1.7.5/lib/memcache.rb index 4d594c2089..3adc172a8f 100644 --- a/activesupport/lib/active_support/vendor/memcache-client-1.6.5/lib/memcache.rb +++ b/activesupport/lib/active_support/vendor/memcache-client-1.7.5/lib/memcache.rb @@ -2,9 +2,9 @@ $TESTING = defined?($TESTING) && $TESTING require 'socket' require 'thread' -require 'timeout' require 'zlib' require 'digest/sha1' +require 'net/protocol' ## # A Ruby client library for memcached. @@ -15,18 +15,22 @@ class MemCache ## # The version of MemCache you are using. - VERSION = '1.6.4.99' + VERSION = '1.7.5' ## # Default options for the cache object. DEFAULT_OPTIONS = { - :namespace => nil, - :readonly => false, - :multithread => true, - :failover => true, - :timeout => 0.5, - :logger => nil, + :namespace => nil, + :readonly => false, + :multithread => true, + :failover => true, + :timeout => 0.5, + :logger => nil, + :no_reply => false, + :check_size => true, + :autofix_keys => false, + :namespace_separator => ':', } ## @@ -50,6 +54,19 @@ class MemCache attr_reader :multithread ## + # Whether to try to fix keys that are too long and will be truncated by + # using their SHA1 hash instead. + # The hash is only used on keys longer than 250 characters, or containing spaces, + # to avoid impacting performance unnecesarily. + # + # In theory, your code should generate correct keys when calling memcache, + # so it's your responsibility and you should try to fix this problem at its source. + # + # But if that's not possible, enable this option and memcache-client will give you a hand. + + attr_reader :autofix_keys + + ## # The servers this client talks to. Play at your own peril. attr_reader :servers @@ -72,19 +89,32 @@ class MemCache attr_reader :logger ## + # Don't send or look for a reply from the memcached server for write operations. + # Please note this feature only works in memcached 1.2.5 and later. Earlier + # versions will reply with "ERROR". + attr_reader :no_reply + + ## # Accepts a list of +servers+ and a list of +opts+. +servers+ may be # omitted. See +servers=+ for acceptable server list arguments. # # Valid options for +opts+ are: # - # [:namespace] Prepends this value to all keys added or retrieved. - # [:readonly] Raises an exception on cache writes when true. - # [:multithread] Wraps cache access in a Mutex for thread safety. - # [:failover] Should the client try to failover to another server if the - # first server is down? Defaults to true. - # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec, - # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8). - # [:logger] Logger to use for info/debug output, defaults to nil + # [:namespace] Prepends this value to all keys added or retrieved. + # [:readonly] Raises an exception on cache writes when true. + # [:multithread] Wraps cache access in a Mutex for thread safety. Defaults to true. + # [:failover] Should the client try to failover to another server if the + # first server is down? Defaults to true. + # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec, + # set to nil to disable timeouts. + # [:logger] Logger to use for info/debug output, defaults to nil + # [:no_reply] Don't bother looking for a reply for write operations (i.e. they + # become 'fire and forget'), memcached 1.2.5 and later only, speeds up + # set/add/delete/incr/decr significantly. + # [:check_size] Raises a MemCacheError if the value to be set is greater than 1 MB, which + # is the maximum key size for the standard memcached server. Defaults to true. + # [:autofix_keys] If a key is longer than 250 characters or contains spaces, + # use an SHA1 hash instead, to prevent collisions on truncated keys. # Other options are ignored. def initialize(*args) @@ -108,13 +138,17 @@ class MemCache end opts = DEFAULT_OPTIONS.merge opts - @namespace = opts[:namespace] - @readonly = opts[:readonly] - @multithread = opts[:multithread] - @timeout = opts[:timeout] - @failover = opts[:failover] - @logger = opts[:logger] - @mutex = Mutex.new if @multithread + @namespace = opts[:namespace] + @readonly = opts[:readonly] + @multithread = opts[:multithread] + @autofix_keys = opts[:autofix_keys] + @timeout = opts[:timeout] + @failover = opts[:failover] + @logger = opts[:logger] + @no_reply = opts[:no_reply] + @check_size = opts[:check_size] + @namespace_separator = opts[:namespace_separator] + @mutex = Mutex.new if @multithread logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger @@ -192,8 +226,8 @@ class MemCache def get(key, raw = false) with_server(key) do |server, cache_key| + logger.debug { "get #{key} from #{server.inspect}" } if logger value = cache_get server, cache_key - logger.debug { "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger return nil if value.nil? value = Marshal.load value unless raw return value @@ -203,6 +237,25 @@ class MemCache end ## + # Performs a +get+ with the given +key+. If + # the value does not exist and a block was given, + # the block will be called and the result saved via +add+. + # + # If you do not provide a block, using this + # method is the same as using +get+. + # + def fetch(key, expiry = 0, raw = false) + value = get(key, raw) + + if value.nil? && block_given? + value = yield + add(key, value, expiry, raw) + end + + value + end + + ## # Retrieves multiple values from memcached in parallel, if possible. # # The memcached protocol supports the ability to retrieve multiple @@ -280,18 +333,64 @@ class MemCache def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly + + value = Marshal.dump value unless raw with_server(key) do |server, cache_key| + logger.debug { "set #{key} to #{server.inspect}: #{value.to_s.size}" } if logger - value = Marshal.dump value unless raw - logger.debug { "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger + if @check_size && value.to_s.size > ONE_MB + raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" + end - data = value.to_s - raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB + command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" - command = "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n" + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + + if result.nil? + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end + + result + end + end + end + + ## + # "cas" is a check and set operation which means "store this data but + # only if no one else has updated since I last fetched it." This can + # be used as a form of optimistic locking. + # + # Works in block form like so: + # cache.cas('some-key') do |value| + # value + 1 + # end + # + # Returns: + # +nil+ if the value was not found on the memcached server. + # +STORED+ if the value was updated successfully + # +EXISTS+ if the value was updated by someone else since last fetch + + def cas(key, expiry=0, raw=false) + raise MemCacheError, "Update of readonly cache" if @readonly + raise MemCacheError, "A block is required" unless block_given? + + (value, token) = gets(key, raw) + return nil unless value + updated = yield value + value = Marshal.dump updated unless raw + + with_server(key) do |server, cache_key| + logger.debug { "cas #{key} to #{server.inspect}: #{value.to_s.size}" } if logger + command = "cas #{cache_key} 0 #{expiry} #{value.to_s.size} #{token}#{noreply}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command + break nil if @no_reply result = socket.gets raise_on_error_response! result @@ -311,17 +410,79 @@ class MemCache # If +raw+ is true, +value+ will not be Marshalled. # # Readers should call this method in the event of a cache miss, not - # MemCache#set or MemCache#[]=. + # MemCache#set. def add(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly + value = Marshal.dump value unless raw + with_server(key) do |server, cache_key| + logger.debug { "add #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + + ## + # Add +key+ to the cache with value +value+ that expires in +expiry+ + # seconds, but only if +key+ already exists in the cache. + # If +raw+ is true, +value+ will not be Marshalled. + def replace(key, value, expiry = 0, raw = false) + raise MemCacheError, "Update of readonly cache" if @readonly + value = Marshal.dump value unless raw + with_server(key) do |server, cache_key| + logger.debug { "replace #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "replace #{cache_key} 0 #{expiry} #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + + ## + # Append - 'add this data to an existing key after existing data' + # Please note the value is always passed to memcached as raw since it + # doesn't make a lot of sense to concatenate marshalled data together. + def append(key, value) + raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| - value = Marshal.dump value unless raw - logger.debug { "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger - command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" + logger.debug { "append #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "append #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command + break nil if @no_reply + result = socket.gets + raise_on_error_response! result + result + end + end + end + + ## + # Prepend - 'add this data to an existing key before existing data' + # Please note the value is always passed to memcached as raw since it + # doesn't make a lot of sense to concatenate marshalled data together. + def prepend(key, value) + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| + logger.debug { "prepend #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "prepend #{cache_key} 0 0 #{value.to_s.size}#{noreply}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + break nil if @no_reply result = socket.gets raise_on_error_response! result result @@ -336,7 +497,9 @@ class MemCache raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| with_socket_management(server) do |socket| - socket.write "delete #{cache_key} #{expiry}\r\n" + logger.debug { "delete #{cache_key} on #{server}" } if logger + socket.write "delete #{cache_key} #{expiry}#{noreply}\r\n" + break nil if @no_reply result = socket.gets raise_on_error_response! result result @@ -346,19 +509,33 @@ class MemCache ## # Flush the cache from all memcache servers. - - def flush_all + # A non-zero value for +delay+ will ensure that the flush + # is propogated slowly through your memcached server farm. + # The Nth server will be flushed N*delay seconds from now, + # asynchronously so this method returns quickly. + # This prevents a huge database spike due to a total + # flush all at once. + + def flush_all(delay=0) raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly begin + delay_time = 0 @servers.each do |server| with_socket_management(server) do |socket| - socket.write "flush_all\r\n" + logger.debug { "flush_all #{delay_time} on #{server}" } if logger + if delay == 0 # older versions of memcached will fail silently otherwise + socket.write "flush_all#{noreply}\r\n" + else + socket.write "flush_all #{delay_time}#{noreply}\r\n" + end + break nil if @no_reply result = socket.gets raise_on_error_response! result result end + delay_time += delay end rescue IndexError => err handle_error nil, err @@ -466,10 +643,14 @@ class MemCache # requested. def make_cache_key(key) + if @autofix_keys and (key =~ /\s/ or (key.length + (namespace.nil? ? 0 : namespace.length)) > 250) + key = "#{Digest::SHA1.hexdigest(key)}-autofixed" + end + if namespace.nil? then key else - "#{@namespace}:#{key}" + "#{@namespace}#{@namespace_separator}#{key}" end end @@ -500,7 +681,7 @@ class MemCache break unless failover hkey = hash_for "#{try}#{key}" end - + raise MemCacheError, "No servers available" end @@ -510,7 +691,8 @@ class MemCache def cache_decr(server, cache_key, amount) with_socket_management(server) do |socket| - socket.write "decr #{cache_key} #{amount}\r\n" + socket.write "decr #{cache_key} #{amount}#{noreply}\r\n" + break nil if @no_reply text = socket.gets raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" @@ -546,6 +728,38 @@ class MemCache end end + def gets(key, raw = false) + with_server(key) do |server, cache_key| + logger.debug { "gets #{key} from #{server.inspect}" } if logger + result = with_socket_management(server) do |socket| + socket.write "gets #{cache_key}\r\n" + keyline = socket.gets # "VALUE <key> <flags> <bytes> <cas token>\r\n" + + if keyline.nil? then + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end + + raise_on_error_response! keyline + return nil if keyline == "END\r\n" + + unless keyline =~ /(\d+) (\w+)\r/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + value = socket.read $1.to_i + socket.read 2 # "\r\n" + socket.gets # "END\r\n" + [value, $2] + end + result[0] = Marshal.load result[0] unless raw + result + end + rescue TypeError => err + handle_error nil, err + end + + ## # Fetches +cache_keys+ from +server+ using a multi-get. @@ -579,7 +793,8 @@ class MemCache def cache_incr(server, cache_key, amount) with_socket_management(server) do |socket| - socket.write "incr #{cache_key} #{amount}\r\n" + socket.write "incr #{cache_key} #{amount}#{noreply}\r\n" + break nil if @no_reply text = socket.gets raise_on_error_response! text return nil if text == "NOT_FOUND\r\n" @@ -617,7 +832,7 @@ class MemCache block.call(socket) - rescue SocketError => err + rescue SocketError, Errno::EAGAIN, Timeout::Error => err logger.warn { "Socket failure: #{err.message}" } if logger server.mark_dead(err) handle_error(server, err) @@ -659,6 +874,10 @@ class MemCache raise new_error end + def noreply + @no_reply ? ' noreply' : '' + end + ## # Performs setup for making a request with +key+ from memcached. Returns # the server to fetch the key from and the complete key to use. @@ -713,13 +932,6 @@ class MemCache class Server ## - # The amount of time to wait to establish a connection with a memcached - # server. If a connection cannot be established within this time limit, - # the server will be marked as down. - - CONNECT_TIMEOUT = 0.25 - - ## # The amount of time to wait before attempting to re-establish a # connection with a server that is marked dead. @@ -802,14 +1014,11 @@ class MemCache # Attempt to connect if not already connected. begin - @sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port) - - if Socket.constants.include? 'TCP_NODELAY' then - @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 - end + @sock = connect_to(@host, @port, @timeout) + @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 @retry = nil @status = 'CONNECTED' - rescue SocketError, SystemCallError, IOError, Timeout::Error => err + rescue SocketError, SystemCallError, IOError => err logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger mark_dead err end @@ -817,6 +1026,12 @@ class MemCache return @sock end + def connect_to(host, port, timeout=nil) + io = MemCache::BufferedIO.new(TCPSocket.new(host, port)) + io.read_timeout = timeout + io + end + ## # Close the connection to the memcached server targeted by this # object. The server is not considered dead. @@ -848,51 +1063,33 @@ class MemCache class MemCacheError < RuntimeError; end -end - -# TCPSocket facade class which implements timeouts. -class TCPTimeoutSocket - - def initialize(host, port, timeout) - Timeout::timeout(MemCache::Server::CONNECT_TIMEOUT, SocketError) do - @sock = TCPSocket.new(host, port) - @len = timeout - end - end - - def write(*args) - Timeout::timeout(@len, SocketError) do - @sock.write(*args) + class BufferedIO < Net::BufferedIO # :nodoc: + BUFSIZE = 1024 * 16 + + if RUBY_VERSION < '1.9.1' + def rbuf_fill + begin + @rbuf << @io.read_nonblock(BUFSIZE) + rescue Errno::EWOULDBLOCK + retry unless @read_timeout + if IO.select([@io], nil, nil, @read_timeout) + retry + else + raise Timeout::Error, 'IO timeout' + end + end + end end - end - def gets(*args) - Timeout::timeout(@len, SocketError) do - @sock.gets(*args) + def setsockopt(*args) + @io.setsockopt(*args) end - end - def read(*args) - Timeout::timeout(@len, SocketError) do - @sock.read(*args) + def gets + readuntil("\n") end end - def _socket - @sock - end - - def method_missing(meth, *args) - @sock.__send__(meth, *args) - end - - def closed? - @sock.closed? - end - - def close - @sock.close - end end module Continuum @@ -932,4 +1129,6 @@ module Continuum "<#{value}, #{server.host}:#{server.port}>" end end + end +require 'continuum_native'
\ No newline at end of file |