aboutsummaryrefslogtreecommitdiffstats
path: root/activesupport/lib/active_support
diff options
context:
space:
mode:
authorJeremy Kemper <jeremy@bitsweat.net>2009-02-23 19:28:01 -0800
committerJeremy Kemper <jeremy@bitsweat.net>2009-02-23 19:28:01 -0800
commite56b3e4c0b60b2b86f5ca9c5e5a0b22fa34d37ab (patch)
tree2d033b252387d702ce96ad8194e79723eeae96dc /activesupport/lib/active_support
parent45787bdd0e9ec20b111e570a20b5f66a949b400c (diff)
downloadrails-e56b3e4c0b60b2b86f5ca9c5e5a0b22fa34d37ab.tar.gz
rails-e56b3e4c0b60b2b86f5ca9c5e5a0b22fa34d37ab.tar.bz2
rails-e56b3e4c0b60b2b86f5ca9c5e5a0b22fa34d37ab.zip
Update bundled memcache-client from 1.5.0.5 to 1.6.4.99.
See http://www.mikeperham.com/2009/02/15/memcache-client-performance/
Diffstat (limited to 'activesupport/lib/active_support')
-rw-r--r--activesupport/lib/active_support/vendor.rb4
-rw-r--r--activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb (renamed from activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb)325
2 files changed, 198 insertions, 131 deletions
diff --git a/activesupport/lib/active_support/vendor.rb b/activesupport/lib/active_support/vendor.rb
index cee45cfea1..39da70a9f3 100644
--- a/activesupport/lib/active_support/vendor.rb
+++ b/activesupport/lib/active_support/vendor.rb
@@ -9,9 +9,9 @@ end
require 'builder'
begin
- gem 'memcache-client', '>= 1.5.0.5'
+ gem 'memcache-client', '>= 1.6.5'
rescue Gem::LoadError
- $:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.5.0.5"
+ $:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.6.5"
end
begin
diff --git a/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb b/activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb
index e90ddf3359..4d594c2089 100644
--- a/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb
+++ b/activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb
@@ -1,52 +1,21 @@
-# All original code copyright 2005, 2006, 2007 Bob Cottrell, Eric Hodel,
-# The Robot Co-op. All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions
-# are met:
-#
-# 1. Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-# 2. Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-# 3. Neither the names of the authors nor the names of their contributors
-# may be used to endorse or promote products derived from this software
-# without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS
-# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
-# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY,
-# OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
-# OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
-# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
-# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
-# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
-# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
$TESTING = defined?($TESTING) && $TESTING
require 'socket'
require 'thread'
require 'timeout'
-require 'rubygems'
require 'zlib'
+require 'digest/sha1'
##
# A Ruby client library for memcached.
#
-# This is intended to provide access to basic memcached functionality. It
-# does not attempt to be complete implementation of the entire API, but it is
-# approaching a complete implementation.
class MemCache
##
# The version of MemCache you are using.
- VERSION = '1.5.0.5'
+ VERSION = '1.6.4.99'
##
# Default options for the cache object.
@@ -54,8 +23,10 @@ class MemCache
DEFAULT_OPTIONS = {
:namespace => nil,
:readonly => false,
- :multithread => false,
- :failover => true
+ :multithread => true,
+ :failover => true,
+ :timeout => 0.5,
+ :logger => nil,
}
##
@@ -69,13 +40,6 @@ class MemCache
DEFAULT_WEIGHT = 1
##
- # The amount of time to wait for a response from a memcached server. If a
- # response is not completed within this time, the connection to the server
- # will be closed and an error will be raised.
-
- attr_accessor :request_timeout
-
- ##
# The namespace for this instance
attr_reader :namespace
@@ -91,9 +55,22 @@ class MemCache
attr_reader :servers
##
- # Whether this client should failover reads and writes to another server
+ # Socket timeout limit with this client, defaults to 0.5 sec.
+ # Set to nil to disable timeouts.
+
+ attr_reader :timeout
+
+ ##
+ # Should the client try to failover to another server if the
+ # first server is down? Defaults to true.
+
+ attr_reader :failover
+
+ ##
+ # Log debug/info/warn/error to the given Logger, defaults to nil.
+
+ attr_reader :logger
- attr_accessor :failover
##
# Accepts a list of +servers+ and a list of +opts+. +servers+ may be
# omitted. See +servers=+ for acceptable server list arguments.
@@ -103,7 +80,11 @@ class MemCache
# [:namespace] Prepends this value to all keys added or retrieved.
# [:readonly] Raises an exception on cache writes when true.
# [:multithread] Wraps cache access in a Mutex for thread safety.
- #
+ # [:failover] Should the client try to failover to another server if the
+ # first server is down? Defaults to true.
+ # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec,
+ # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8).
+ # [:logger] Logger to use for info/debug output, defaults to nil
# Other options are ignored.
def initialize(*args)
@@ -130,9 +111,15 @@ class MemCache
@namespace = opts[:namespace]
@readonly = opts[:readonly]
@multithread = opts[:multithread]
- @failover = opts[:failover]
+ @timeout = opts[:timeout]
+ @failover = opts[:failover]
+ @logger = opts[:logger]
@mutex = Mutex.new if @multithread
- @buckets = []
+
+ logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger
+
+ Thread.current[:memcache_client] = self.object_id if !@multithread
+
self.servers = servers
end
@@ -140,8 +127,8 @@ class MemCache
# Returns a string representation of the cache object.
def inspect
- "<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" %
- [@servers.length, @buckets.length, @namespace, @readonly]
+ "<MemCache: %d servers, ns: %p, ro: %p>" %
+ [@servers.length, @namespace, @readonly]
end
##
@@ -162,7 +149,7 @@ class MemCache
# Set the servers that the requests will be distributed between. Entries
# can be either strings of the form "hostname:port" or
# "hostname:port:weight" or MemCache::Server objects.
-
+ #
def servers=(servers)
# Create the server objects.
@servers = Array(servers).collect do |server|
@@ -172,21 +159,17 @@ class MemCache
port ||= DEFAULT_PORT
weight ||= DEFAULT_WEIGHT
Server.new self, host, port, weight
- when Server
- if server.memcache.multithread != @multithread then
- raise ArgumentError, "can't mix threaded and non-threaded servers"
- end
- server
else
- raise TypeError, "cannot convert #{server.class} into MemCache::Server"
+ server
end
end
- # Create an array of server buckets for weight selection of servers.
- @buckets = []
- @servers.each do |server|
- server.weight.times { @buckets.push(server) }
- end
+ logger.debug { "Servers now: #{@servers.inspect}" } if logger
+
+ # There's no point in doing this if there's only one server
+ @continuum = create_continuum_for(@servers) if @servers.size > 1
+
+ @servers
end
##
@@ -210,6 +193,7 @@ class MemCache
def get(key, raw = false)
with_server(key) do |server, cache_key|
value = cache_get server, cache_key
+ logger.debug { "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
return nil if value.nil?
value = Marshal.load value unless raw
return value
@@ -233,6 +217,8 @@ class MemCache
# cache["a"] = 1
# cache["b"] = 2
# cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 }
+ #
+ # Note that get_multi assumes the values are marshalled.
def get_multi(*keys)
raise MemCacheError, 'No active servers' unless active?
@@ -252,15 +238,20 @@ class MemCache
results = {}
server_keys.each do |server, keys_for_server|
- keys_for_server = keys_for_server.join ' '
- values = cache_get_multi server, keys_for_server
- values.each do |key, value|
- results[cache_keys[key]] = Marshal.load value
+ keys_for_server_str = keys_for_server.join ' '
+ begin
+ values = cache_get_multi server, keys_for_server_str
+ values.each do |key, value|
+ results[cache_keys[key]] = Marshal.load value
+ end
+ rescue IndexError => e
+ # Ignore this server and try the others
+ logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger
end
end
return results
- rescue TypeError, IndexError => err
+ rescue TypeError => err
handle_error nil, err
end
@@ -285,12 +276,19 @@ class MemCache
# Warning: Readers should not call this method in the event of a cache miss;
# see MemCache#add.
+ ONE_MB = 1024 * 1024
+
def set(key, value, expiry = 0, raw = false)
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
value = Marshal.dump value unless raw
- command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
+ logger.debug { "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger
+
+ data = value.to_s
+ raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB
+
+ command = "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n"
with_socket_management(server) do |socket|
socket.write command
@@ -319,7 +317,8 @@ class MemCache
raise MemCacheError, "Update of readonly cache" if @readonly
with_server(key) do |server, cache_key|
value = Marshal.dump value unless raw
- command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n"
+ logger.debug { "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger
+ command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n"
with_socket_management(server) do |socket|
socket.write command
@@ -353,7 +352,6 @@ class MemCache
raise MemCacheError, "Update of readonly cache" if @readonly
begin
- @mutex.lock if @multithread
@servers.each do |server|
with_socket_management(server) do |socket|
socket.write "flush_all\r\n"
@@ -364,8 +362,6 @@ class MemCache
end
rescue IndexError => err
handle_error nil, err
- ensure
- @mutex.unlock if @multithread
end
end
@@ -424,7 +420,7 @@ class MemCache
while line = socket.gets do
raise_on_error_response! line
break if line == "END\r\n"
- if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then
+ if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then
name, value = $1, $2
stats[name] = case name
when 'version'
@@ -478,6 +474,14 @@ class MemCache
end
##
+ # Returns an interoperable hash value for +key+. (I think, docs are
+ # sketchy for down servers).
+
+ def hash_for(key)
+ Zlib.crc32(key)
+ end
+
+ ##
# Pick a server to handle the request based on a hash of the key.
def get_server_for_key(key, options = {})
@@ -487,27 +491,17 @@ class MemCache
raise MemCacheError, "No servers available" if @servers.empty?
return @servers.first if @servers.length == 1
- hkey = hash_for key
-
- if @failover
- 20.times do |try|
- server = @buckets[hkey % @buckets.compact.size]
- return server if server.alive?
- hkey += hash_for "#{try}#{key}"
- end
- else
- return @buckets[hkey % @buckets.compact.size]
- end
-
- raise MemCacheError, "No servers available"
- end
+ hkey = hash_for(key)
- ##
- # Returns an interoperable hash value for +key+. (I think, docs are
- # sketchy for down servers).
+ 20.times do |try|
+ entryidx = Continuum.binary_search(@continuum, hkey)
+ server = @continuum[entryidx].server
+ return server if server.alive?
+ break unless failover
+ hkey = hash_for "#{try}#{key}"
+ end
- def hash_for(key)
- (Zlib.crc32(key) >> 16) & 0x7fff
+ raise MemCacheError, "No servers available"
end
##
@@ -608,24 +602,28 @@ class MemCache
# failures (but does still apply to unexpectedly lost connections etc.).
def with_socket_management(server, &block)
+ check_multithread_status!
+
@mutex.lock if @multithread
retried = false
-
+
begin
socket = server.socket
# Raise an IndexError to show this server is out of whack. If were inside
# a with_server block, we'll catch it and attempt to restart the operation.
-
+
raise IndexError, "No connection to server (#{server.status})" if socket.nil?
-
+
block.call(socket)
-
+
rescue SocketError => err
- server.mark_dead(err.message)
+ logger.warn { "Socket failure: #{err.message}" } if logger
+ server.mark_dead(err)
handle_error(server, err)
- rescue MemCacheError, SocketError, SystemCallError, IOError => err
+ rescue MemCacheError, SystemCallError, IOError => err
+ logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger
handle_error(server, err) if retried || socket.nil?
retried = true
retry
@@ -640,8 +638,9 @@ class MemCache
server, cache_key = request_setup(key)
yield server, cache_key
rescue IndexError => e
+ logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger
if !retried && @servers.size > 1
- puts "Connection to server #{server.inspect} DIED! Retrying operation..."
+ logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger
retried = true
retry
end
@@ -677,6 +676,37 @@ class MemCache
end
end
+ def create_continuum_for(servers)
+ total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
+ continuum = []
+
+ servers.each do |server|
+ entry_count_for(server, servers.size, total_weight).times do |idx|
+ hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}")
+ value = Integer("0x#{hash[0..7]}")
+ continuum << Continuum::Entry.new(value, server)
+ end
+ end
+
+ continuum.sort { |a, b| a.value <=> b.value }
+ end
+
+ def entry_count_for(server, total_servers, total_weight)
+ ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor
+ end
+
+ def check_multithread_status!
+ return if @multithread
+
+ if Thread.current[:memcache_client] != self.object_id
+ raise MemCacheError, <<-EOM
+ You are accessing this memcache-client instance from multiple threads but have not enabled multithread support.
+ Normally: MemCache.new(['localhost:11211'], :multithread => true)
+ In Rails: config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }]
+ EOM
+ end
+ end
+
##
# This class represents a memcached server instance.
@@ -690,13 +720,6 @@ class MemCache
CONNECT_TIMEOUT = 0.25
##
- # The amount of time to wait for a response from a memcached server.
- # If a response isn't received within this time limit,
- # the server will be marked as down.
-
- SOCKET_TIMEOUT = 0.5
-
- ##
# The amount of time to wait before attempting to re-establish a
# connection with a server that is marked dead.
@@ -727,6 +750,8 @@ class MemCache
attr_reader :status
+ attr_reader :logger
+
##
# Create a new MemCache::Server object for the memcached instance
# listening on the given host and port, weighted by the given weight.
@@ -735,17 +760,15 @@ class MemCache
raise ArgumentError, "No host specified" if host.nil? or host.empty?
raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero?
- @memcache = memcache
@host = host
@port = port.to_i
@weight = weight.to_i
- @multithread = @memcache.multithread
- @mutex = Mutex.new
-
@sock = nil
@retry = nil
@status = 'NOT CONNECTED'
+ @timeout = memcache.timeout
+ @logger = memcache.logger
end
##
@@ -770,7 +793,6 @@ class MemCache
# Returns the connected socket object on success or nil on failure.
def socket
- @mutex.lock if @multithread
return @sock if @sock and not @sock.closed?
@sock = nil
@@ -780,8 +802,7 @@ class MemCache
# Attempt to connect if not already connected.
begin
-
- @sock = TCPTimeoutSocket.new @host, @port
+ @sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port)
if Socket.constants.include? 'TCP_NODELAY' then
@sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
@@ -789,12 +810,11 @@ class MemCache
@retry = nil
@status = 'CONNECTED'
rescue SocketError, SystemCallError, IOError, Timeout::Error => err
- mark_dead err.message
+ logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger
+ mark_dead err
end
return @sock
- ensure
- @mutex.unlock if @multithread
end
##
@@ -802,24 +822,23 @@ class MemCache
# object. The server is not considered dead.
def close
- @mutex.lock if @multithread
@sock.close if @sock && !@sock.closed?
@sock = nil
@retry = nil
@status = "NOT CONNECTED"
- ensure
- @mutex.unlock if @multithread
end
##
# Mark the server as dead and close its socket.
- def mark_dead(reason = "Unknown error")
+ def mark_dead(error)
@sock.close if @sock && !@sock.closed?
@sock = nil
@retry = Time.now + RETRY_DELAY
- @status = sprintf "%s:%s DEAD: %s, will retry at %s", @host, @port, reason, @retry
+ reason = "#{error.class.name}: #{error.message}"
+ @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry
+ @logger.info { @status } if @logger
end
end
@@ -833,36 +852,84 @@ end
# TCPSocket facade class which implements timeouts.
class TCPTimeoutSocket
- def initialize(*args)
+
+ def initialize(host, port, timeout)
Timeout::timeout(MemCache::Server::CONNECT_TIMEOUT, SocketError) do
- @sock = TCPSocket.new(*args)
- @len = MemCache::Server::SOCKET_TIMEOUT.to_f || 0.5
+ @sock = TCPSocket.new(host, port)
+ @len = timeout
end
end
-
+
def write(*args)
Timeout::timeout(@len, SocketError) do
@sock.write(*args)
end
end
-
+
def gets(*args)
Timeout::timeout(@len, SocketError) do
@sock.gets(*args)
end
end
-
+
def read(*args)
Timeout::timeout(@len, SocketError) do
@sock.read(*args)
end
end
-
+
def _socket
@sock
end
-
+
def method_missing(meth, *args)
@sock.__send__(meth, *args)
end
-end \ No newline at end of file
+
+ def closed?
+ @sock.closed?
+ end
+
+ def close
+ @sock.close
+ end
+end
+
+module Continuum
+ POINTS_PER_SERVER = 160 # this is the default in libmemcached
+
+ # Find the closest index in Continuum with value <= the given value
+ def self.binary_search(ary, value, &block)
+ upper = ary.size - 1
+ lower = 0
+ idx = 0
+
+ while(lower <= upper) do
+ idx = (lower + upper) / 2
+ comp = ary[idx].value <=> value
+
+ if comp == 0
+ return idx
+ elsif comp > 0
+ upper = idx - 1
+ else
+ lower = idx + 1
+ end
+ end
+ return upper
+ end
+
+ class Entry
+ attr_reader :value
+ attr_reader :server
+
+ def initialize(val, srv)
+ @value = val
+ @server = srv
+ end
+
+ def inspect
+ "<#{value}, #{server.host}:#{server.port}>"
+ end
+ end
+end