diff options
Diffstat (limited to 'activesupport/lib/active_support/vendor')
-rw-r--r-- | activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb (renamed from activesupport/lib/active_support/vendor/memcache-client-1.5.1/memcache.rb) | 461 |
1 files changed, 240 insertions, 221 deletions
diff --git a/activesupport/lib/active_support/vendor/memcache-client-1.5.1/memcache.rb b/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb index 99c9af0398..e90ddf3359 100644 --- a/activesupport/lib/active_support/vendor/memcache-client-1.5.1/memcache.rb +++ b/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb @@ -26,36 +26,13 @@ # OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, # EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +$TESTING = defined?($TESTING) && $TESTING require 'socket' require 'thread' require 'timeout' require 'rubygems' - -class String - - ## - # Uses the ITU-T polynomial in the CRC32 algorithm. - - def crc32_ITU_T - n = length - r = 0xFFFFFFFF - - n.times do |i| - r ^= self[i] - 8.times do - if (r & 1) != 0 then - r = (r>>1) ^ 0xEDB88320 - else - r >>= 1 - end - end - end - - r ^ 0xFFFFFFFF - end - -end +require 'zlib' ## # A Ruby client library for memcached. @@ -69,7 +46,7 @@ class MemCache ## # The version of MemCache you are using. - VERSION = '1.5.0' + VERSION = '1.5.0.5' ## # Default options for the cache object. @@ -78,6 +55,7 @@ class MemCache :namespace => nil, :readonly => false, :multithread => false, + :failover => true } ## @@ -113,6 +91,10 @@ class MemCache attr_reader :servers ## + # Whether this client should failover reads and writes to another server + + attr_accessor :failover + ## # Accepts a list of +servers+ and a list of +opts+. +servers+ may be # omitted. See +servers=+ for acceptable server list arguments. # @@ -148,6 +130,7 @@ class MemCache @namespace = opts[:namespace] @readonly = opts[:readonly] @multithread = opts[:multithread] + @failover = opts[:failover] @mutex = Mutex.new if @multithread @buckets = [] self.servers = servers @@ -182,7 +165,7 @@ class MemCache def servers=(servers) # Create the server objects. - @servers = servers.collect do |server| + @servers = Array(servers).collect do |server| case server when String host, port, weight = server.split ':', 3 @@ -212,15 +195,12 @@ class MemCache # 0. +key+ can not be decremented below 0. def decr(key, amount = 1) - server, cache_key = request_setup key - - if @multithread then - threadsafe_cache_decr server, cache_key, amount - else + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| cache_decr server, cache_key, amount end - rescue TypeError, SocketError, SystemCallError, IOError => err - handle_error server, err + rescue TypeError => err + handle_error nil, err end ## @@ -228,21 +208,14 @@ class MemCache # unmarshalled. def get(key, raw = false) - server, cache_key = request_setup key - - value = if @multithread then - threadsafe_cache_get server, cache_key - else - cache_get server, cache_key - end - - return nil if value.nil? - - value = Marshal.load value unless raw - - return value - rescue TypeError, SocketError, SystemCallError, IOError => err - handle_error server, err + with_server(key) do |server, cache_key| + value = cache_get server, cache_key + return nil if value.nil? + value = Marshal.load value unless raw + return value + end + rescue TypeError => err + handle_error nil, err end ## @@ -280,36 +253,29 @@ class MemCache server_keys.each do |server, keys_for_server| keys_for_server = keys_for_server.join ' ' - values = if @multithread then - threadsafe_cache_get_multi server, keys_for_server - else - cache_get_multi server, keys_for_server - end + values = cache_get_multi server, keys_for_server values.each do |key, value| results[cache_keys[key]] = Marshal.load value end end return results - rescue TypeError, SocketError, SystemCallError, IOError => err - handle_error server, err + rescue TypeError, IndexError => err + handle_error nil, err end ## - # Increments the value for +key+ by +amount+ and retruns the new value. + # Increments the value for +key+ by +amount+ and returns the new value. # +key+ must already exist. If +key+ is not an integer, it is assumed to be # 0. def incr(key, amount = 1) - server, cache_key = request_setup key - - if @multithread then - threadsafe_cache_incr server, cache_key, amount - else + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| cache_incr server, cache_key, amount end - rescue TypeError, SocketError, SystemCallError, IOError => err - handle_error server, err + rescue TypeError => err + handle_error nil, err end ## @@ -321,23 +287,23 @@ class MemCache def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - socket = server.socket + with_server(key) do |server, cache_key| - value = Marshal.dump value unless raw - command = "set #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" + value = Marshal.dump value unless raw + command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" - begin - @mutex.lock if @multithread - socket.write command - result = socket.gets - raise_on_error_response! result - result - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message - ensure - @mutex.unlock if @multithread + with_socket_management(server) do |socket| + socket.write command + result = socket.gets + raise_on_error_response! result + + if result.nil? + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end + + result + end end end @@ -351,23 +317,16 @@ class MemCache def add(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly - server, cache_key = request_setup key - socket = server.socket - - value = Marshal.dump value unless raw - command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" - - begin - @mutex.lock if @multithread - socket.write command - result = socket.gets - raise_on_error_response! result - result - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message - ensure - @mutex.unlock if @multithread + with_server(key) do |server, cache_key| + value = Marshal.dump value unless raw + command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" + + with_socket_management(server) do |socket| + socket.write command + result = socket.gets + raise_on_error_response! result + result + end end end @@ -375,26 +334,15 @@ class MemCache # Removes +key+ from the cache in +expiry+ seconds. def delete(key, expiry = 0) - @mutex.lock if @multithread - - raise MemCacheError, "No active servers" unless active? - cache_key = make_cache_key key - server = get_server_for_key cache_key - - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? - - begin - sock.write "delete #{cache_key} #{expiry}\r\n" - result = sock.gets - raise_on_error_response! result - result - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message + raise MemCacheError, "Update of readonly cache" if @readonly + with_server(key) do |server, cache_key| + with_socket_management(server) do |socket| + socket.write "delete #{cache_key} #{expiry}\r\n" + result = socket.gets + raise_on_error_response! result + result + end end - ensure - @mutex.unlock if @multithread end ## @@ -403,21 +351,19 @@ class MemCache def flush_all raise MemCacheError, 'No active servers' unless active? raise MemCacheError, "Update of readonly cache" if @readonly + begin @mutex.lock if @multithread @servers.each do |server| - begin - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? - sock.write "flush_all\r\n" - result = sock.gets + with_socket_management(server) do |socket| + socket.write "flush_all\r\n" + result = socket.gets raise_on_error_response! result result - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message end end + rescue IndexError => err + handle_error nil, err ensure @mutex.unlock if @multithread end @@ -469,14 +415,13 @@ class MemCache server_stats = {} @servers.each do |server| - sock = server.socket - raise MemCacheError, "No connection to server" if sock.nil? + next unless server.alive? - value = nil - begin - sock.write "stats\r\n" + with_socket_management(server) do |socket| + value = nil + socket.write "stats\r\n" stats = {} - while line = sock.gets do + while line = socket.gets do raise_on_error_response! line break if line == "END\r\n" if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then @@ -498,12 +443,10 @@ class MemCache end end server_stats["#{server.host}:#{server.port}"] = stats - rescue SocketError, SystemCallError, IOError => err - server.close - raise MemCacheError, err.message end end + raise MemCacheError, "No active servers" if server_stats.empty? server_stats end @@ -520,7 +463,7 @@ class MemCache set key, value end - protected + protected unless $TESTING ## # Create a key for the cache, incorporating the namespace qualifier if @@ -537,7 +480,7 @@ class MemCache ## # Pick a server to handle the request based on a hash of the key. - def get_server_for_key(key) + def get_server_for_key(key, options = {}) raise ArgumentError, "illegal character in key #{key.inspect}" if key =~ /\s/ raise ArgumentError, "key too long #{key.inspect}" if key.length > 250 @@ -545,13 +488,17 @@ class MemCache return @servers.first if @servers.length == 1 hkey = hash_for key - - 20.times do |try| - server = @buckets[hkey % @buckets.nitems] - return server if server.alive? - hkey += hash_for "#{try}#{key}" + + if @failover + 20.times do |try| + server = @buckets[hkey % @buckets.compact.size] + return server if server.alive? + hkey += hash_for "#{try}#{key}" + end + else + return @buckets[hkey % @buckets.compact.size] end - + raise MemCacheError, "No servers available" end @@ -560,7 +507,7 @@ class MemCache # sketchy for down servers). def hash_for(key) - (key.crc32_ITU_T >> 16) & 0x7fff + (Zlib.crc32(key) >> 16) & 0x7fff end ## @@ -568,12 +515,13 @@ class MemCache # found. def cache_decr(server, cache_key, amount) - socket = server.socket - socket.write "decr #{cache_key} #{amount}\r\n" - text = socket.gets - raise_on_error_response! text - return nil if text == "NOT_FOUND\r\n" - return text.to_i + with_socket_management(server) do |socket| + socket.write "decr #{cache_key} #{amount}\r\n" + text = socket.gets + raise_on_error_response! text + return nil if text == "NOT_FOUND\r\n" + return text.to_i + end end ## @@ -581,52 +529,54 @@ class MemCache # miss. def cache_get(server, cache_key) - socket = server.socket - socket.write "get #{cache_key}\r\n" - keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" + with_socket_management(server) do |socket| + socket.write "get #{cache_key}\r\n" + keyline = socket.gets # "VALUE <key> <flags> <bytes>\r\n" - if keyline.nil? then - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" - end + if keyline.nil? then + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" + end - raise_on_error_response! keyline - return nil if keyline == "END\r\n" + raise_on_error_response! keyline + return nil if keyline == "END\r\n" - unless keyline =~ /(\d+)\r/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" + unless keyline =~ /(\d+)\r/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + value = socket.read $1.to_i + socket.read 2 # "\r\n" + socket.gets # "END\r\n" + return value end - value = socket.read $1.to_i - socket.read 2 # "\r\n" - socket.gets # "END\r\n" - return value end ## # Fetches +cache_keys+ from +server+ using a multi-get. def cache_get_multi(server, cache_keys) - values = {} - socket = server.socket - socket.write "get #{cache_keys}\r\n" + with_socket_management(server) do |socket| + values = {} + socket.write "get #{cache_keys}\r\n" - while keyline = socket.gets do - return values if keyline == "END\r\n" - raise_on_error_response! keyline + while keyline = socket.gets do + return values if keyline == "END\r\n" + raise_on_error_response! keyline - unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then - server.close - raise MemCacheError, "unexpected response #{keyline.inspect}" + unless keyline =~ /\AVALUE (.+) (.+) (.+)/ then + server.close + raise MemCacheError, "unexpected response #{keyline.inspect}" + end + + key, data_length = $1, $3 + values[$1] = socket.read data_length.to_i + socket.read(2) # "\r\n" end - key, data_length = $1, $3 - values[$1] = socket.read data_length.to_i - socket.read(2) # "\r\n" + server.close + raise MemCacheError, "lost connection to #{server.host}:#{server.port}" # TODO: retry here too end - - server.close - raise MemCacheError, "lost connection to #{server.host}:#{server.port}" end ## @@ -634,18 +584,76 @@ class MemCache # found. def cache_incr(server, cache_key, amount) - socket = server.socket - socket.write "incr #{cache_key} #{amount}\r\n" - text = socket.gets - raise_on_error_response! text - return nil if text == "NOT_FOUND\r\n" - return text.to_i + with_socket_management(server) do |socket| + socket.write "incr #{cache_key} #{amount}\r\n" + text = socket.gets + raise_on_error_response! text + return nil if text == "NOT_FOUND\r\n" + return text.to_i + end + end + + ## + # Gets or creates a socket connected to the given server, and yields it + # to the block, wrapped in a mutex synchronization if @multithread is true. + # + # If a socket error (SocketError, SystemCallError, IOError) or protocol error + # (MemCacheError) is raised by the block, closes the socket, attempts to + # connect again, and retries the block (once). If an error is again raised, + # reraises it as MemCacheError. + # + # If unable to connect to the server (or if in the reconnect wait period), + # raises MemCacheError. Note that the socket connect code marks a server + # dead for a timeout period, so retrying does not apply to connection attempt + # failures (but does still apply to unexpectedly lost connections etc.). + + def with_socket_management(server, &block) + @mutex.lock if @multithread + retried = false + + begin + socket = server.socket + + # Raise an IndexError to show this server is out of whack. If were inside + # a with_server block, we'll catch it and attempt to restart the operation. + + raise IndexError, "No connection to server (#{server.status})" if socket.nil? + + block.call(socket) + + rescue SocketError => err + server.mark_dead(err.message) + handle_error(server, err) + + rescue MemCacheError, SocketError, SystemCallError, IOError => err + handle_error(server, err) if retried || socket.nil? + retried = true + retry + end + ensure + @mutex.unlock if @multithread + end + + def with_server(key) + retried = false + begin + server, cache_key = request_setup(key) + yield server, cache_key + rescue IndexError => e + if !retried && @servers.size > 1 + puts "Connection to server #{server.inspect} DIED! Retrying operation..." + retried = true + retry + end + handle_error(nil, e) + end end ## # Handles +error+ from +server+. def handle_error(server, error) + raise error if error.is_a?(MemCacheError) server.close if server new_error = MemCacheError.new error.message new_error.set_backtrace error.backtrace @@ -660,45 +668,15 @@ class MemCache raise MemCacheError, 'No active servers' unless active? cache_key = make_cache_key key server = get_server_for_key cache_key - raise MemCacheError, 'No connection to server' if server.socket.nil? return server, cache_key end - def threadsafe_cache_decr(server, cache_key, amount) # :nodoc: - @mutex.lock - cache_decr server, cache_key, amount - ensure - @mutex.unlock - end - - def threadsafe_cache_get(server, cache_key) # :nodoc: - @mutex.lock - cache_get server, cache_key - ensure - @mutex.unlock - end - - def threadsafe_cache_get_multi(socket, cache_keys) # :nodoc: - @mutex.lock - cache_get_multi socket, cache_keys - ensure - @mutex.unlock - end - - def threadsafe_cache_incr(server, cache_key, amount) # :nodoc: - @mutex.lock - cache_incr server, cache_key, amount - ensure - @mutex.unlock - end - def raise_on_error_response!(response) - if response =~ /\A(?:CLIENT_|SERVER_)?ERROR (.*)/ + if response =~ /\A(?:CLIENT_|SERVER_)?ERROR(.*)/ raise MemCacheError, $1.strip end end - ## # This class represents a memcached server instance. @@ -712,6 +690,13 @@ class MemCache CONNECT_TIMEOUT = 0.25 ## + # The amount of time to wait for a response from a memcached server. + # If a response isn't received within this time limit, + # the server will be marked as down. + + SOCKET_TIMEOUT = 0.5 + + ## # The amount of time to wait before attempting to re-establish a # connection with a server that is marked dead. @@ -795,9 +780,9 @@ class MemCache # Attempt to connect if not already connected. begin - @sock = timeout CONNECT_TIMEOUT do - TCPSocket.new @host, @port - end + + @sock = TCPTimeoutSocket.new @host, @port + if Socket.constants.include? 'TCP_NODELAY' then @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 end @@ -826,8 +811,6 @@ class MemCache @mutex.unlock if @multithread end - private - ## # Mark the server as dead and close its socket. @@ -836,8 +819,9 @@ class MemCache @sock = nil @retry = Time.now + RETRY_DELAY - @status = sprintf "DEAD: %s, will retry at %s", reason, @retry + @status = sprintf "%s:%s DEAD: %s, will retry at %s", @host, @port, reason, @retry end + end ## @@ -847,3 +831,38 @@ class MemCache end +# TCPSocket facade class which implements timeouts. +class TCPTimeoutSocket + def initialize(*args) + Timeout::timeout(MemCache::Server::CONNECT_TIMEOUT, SocketError) do + @sock = TCPSocket.new(*args) + @len = MemCache::Server::SOCKET_TIMEOUT.to_f || 0.5 + end + end + + def write(*args) + Timeout::timeout(@len, SocketError) do + @sock.write(*args) + end + end + + def gets(*args) + Timeout::timeout(@len, SocketError) do + @sock.gets(*args) + end + end + + def read(*args) + Timeout::timeout(@len, SocketError) do + @sock.read(*args) + end + end + + def _socket + @sock + end + + def method_missing(meth, *args) + @sock.__send__(meth, *args) + end +end
\ No newline at end of file |