diff options
author | Pratik Naik <pratiknaik@gmail.com> | 2009-02-24 12:30:37 +0000 |
---|---|---|
committer | Pratik Naik <pratiknaik@gmail.com> | 2009-02-24 12:30:37 +0000 |
commit | 1248c6325b28082232162551af055f3681b0813a (patch) | |
tree | 3d8a087421f0d74da7a7c3878e3ad1dddbf23697 | |
parent | 346ac0bba7cfbfbd0a7155163ca4125bc80ba463 (diff) | |
parent | 53cd102b39eb62567298430cbd94e40dd78d46a0 (diff) | |
download | rails-1248c6325b28082232162551af055f3681b0813a.tar.gz rails-1248c6325b28082232162551af055f3681b0813a.tar.bz2 rails-1248c6325b28082232162551af055f3681b0813a.zip |
Merge commit 'mainstream/master'
-rw-r--r-- | activerecord/CHANGELOG | 2 | ||||
-rw-r--r-- | activerecord/lib/active_record.rb | 1 | ||||
-rwxr-xr-x | activerecord/lib/active_record/base.rb | 2 | ||||
-rw-r--r-- | activerecord/lib/active_record/batches.rb | 70 | ||||
-rw-r--r-- | activerecord/test/cases/batches_test.rb | 49 | ||||
-rw-r--r-- | activesupport/CHANGELOG | 2 | ||||
-rw-r--r-- | activesupport/lib/active_support/vendor.rb | 4 | ||||
-rw-r--r-- | activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb (renamed from activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb) | 325 |
8 files changed, 323 insertions, 132 deletions
diff --git a/activerecord/CHANGELOG b/activerecord/CHANGELOG index a63c09bbaa..fc6986912c 100644 --- a/activerecord/CHANGELOG +++ b/activerecord/CHANGELOG @@ -1,5 +1,7 @@ *Edge* +* Added ActiveRecord::Base.each and ActiveRecord::Base.find_in_batches for batch processing [DHH/Jamis Buck] + * Added that ActiveRecord::Base.exists? can be called with no arguments #1817 [Scott Taylor] diff --git a/activerecord/lib/active_record.rb b/activerecord/lib/active_record.rb index fa804f6a73..2f8c5c712f 100644 --- a/activerecord/lib/active_record.rb +++ b/activerecord/lib/active_record.rb @@ -48,6 +48,7 @@ module ActiveRecord autoload :AttributeMethods, 'active_record/attribute_methods' autoload :AutosaveAssociation, 'active_record/autosave_association' autoload :Base, 'active_record/base' + autoload :Batches, 'active_record/batches' autoload :Calculations, 'active_record/calculations' autoload :Callbacks, 'active_record/callbacks' autoload :Dirty, 'active_record/dirty' diff --git a/activerecord/lib/active_record/base.rb b/activerecord/lib/active_record/base.rb index 188168c89d..55ab1facf2 100755 --- a/activerecord/lib/active_record/base.rb +++ b/activerecord/lib/active_record/base.rb @@ -3148,7 +3148,7 @@ module ActiveRecord #:nodoc: # #save_with_autosave_associations to be wrapped inside a transaction. include AutosaveAssociation, NestedAttributes - include Aggregations, Transactions, Reflection, Calculations, Serialization + include Aggregations, Transactions, Reflection, Batches, Calculations, Serialization end end diff --git a/activerecord/lib/active_record/batches.rb b/activerecord/lib/active_record/batches.rb new file mode 100644 index 0000000000..9e9c8fbbf4 --- /dev/null +++ b/activerecord/lib/active_record/batches.rb @@ -0,0 +1,70 @@ +module ActiveRecord + module Batches # :nodoc: + def self.included(base) + base.extend(ClassMethods) + end + + # When processing large numbers of records, it's often a good idea to do so in batches to prevent memory ballooning. + module ClassMethods + # Yields each record that was found by the find +options+. The find is performed by find_in_batches + # with a batch size of 1000 (or as specified by the +batch_size+ option). + # + # Example: + # + # Person.each(:conditions => "age > 21") do |person| + # person.party_all_night! + # end + # + # Note: This method is only intended to use for batch processing of large amounts of records that wouldn't fit in + # memory all at once. If you just need to loop over less than 1000 records, it's probably better just to use the + # regular find methods. + def each(options = {}) + find_in_batches(options) do |records| + records.each { |record| yield record } + end + + self + end + + # Yields each batch of records that was found by the find +options+ as an array. The size of each batch is + # set by the +batch_size+ option; the default is 1000. + # + # You can control the starting point for the batch processing by supplying the +start+ option. This is especially + # useful if you want multiple workers dealing with the same processing queue. You can make worker 1 handle all the + # records between id 0 and 10,000 and worker 2 handle from 10,000 and beyond (by setting the +start+ option on that + # worker). + # + # It's not possible to set the order. That is automatically set to ascending on the primary key ("id ASC") + # to make the batch ordering work. This also mean that this method only works with integer-based primary keys. + # You can't set the limit either, that's used to control the the batch sizes. + # + # Example: + # + # Person.find_in_batches(:conditions => "age > 21") do |group| + # sleep(50) # Make sure it doesn't get too crowded in there! + # group.each { |person| person.party_all_night! } + # end + def find_in_batches(options = {}) + raise "You can't specify an order, it's forced to be #{batch_order}" if options[:order] + raise "You can't specify a limit, it's forced to be the batch_size" if options[:limit] + + start = options.delete(:start).to_i + + with_scope(:find => options.merge(:order => batch_order, :limit => options.delete(:batch_size) || 1000)) do + records = find(:all, :conditions => [ "#{table_name}.#{primary_key} >= ?", start ]) + + while records.any? + yield records + records = find(:all, :conditions => [ "#{table_name}.#{primary_key} > ?", records.last.id ]) + end + end + end + + + private + def batch_order + "#{table_name}.#{primary_key} ASC" + end + end + end +end
\ No newline at end of file diff --git a/activerecord/test/cases/batches_test.rb b/activerecord/test/cases/batches_test.rb new file mode 100644 index 0000000000..108d679108 --- /dev/null +++ b/activerecord/test/cases/batches_test.rb @@ -0,0 +1,49 @@ +require 'cases/helper' +require 'models/post' + +class EachTest < ActiveRecord::TestCase + fixtures :posts + + def setup + @posts = Post.all(:order => "id asc") + @total = Post.count + end + + def test_each_should_excecute_one_query_per_batch + assert_queries(Post.count + 1) do + Post.each(:batch_size => 1) do |post| + assert_kind_of Post, post + end + end + end + + def test_each_should_raise_if_the_order_is_set + assert_raise(RuntimeError) do + Post.each(:order => "title") { |post| post } + end + end + + def test_each_should_raise_if_the_limit_is_set + assert_raise(RuntimeError) do + Post.each(:limit => 1) { |post| post } + end + end + + def test_find_in_batches_should_return_batches + assert_queries(Post.count + 1) do + Post.find_in_batches(:batch_size => 1) do |batch| + assert_kind_of Array, batch + assert_kind_of Post, batch.first + end + end + end + + def test_find_in_batches_should_start_from_the_start_option + assert_queries(Post.count) do + Post.find_in_batches(:batch_size => 1, :start => 2) do |batch| + assert_kind_of Array, batch + assert_kind_of Post, batch.first + end + end + end +end
\ No newline at end of file diff --git a/activesupport/CHANGELOG b/activesupport/CHANGELOG index afffc9ab1a..70bea27cd1 100644 --- a/activesupport/CHANGELOG +++ b/activesupport/CHANGELOG @@ -1,5 +1,7 @@ *Edge* +* Update bundled memcache-client from 1.5.0.5 to 1.6.4.99. See http://www.mikeperham.com/2009/02/15/memcache-client-performance/ [Mike Perham] + * Ruby 1.9.1p0 fix: URI.unescape can decode multibyte chars. #2033 [MOROHASHI Kyosuke] * Time#to_s(:rfc822) uses #formatted_offset instead of unreliable and non-standard %z directive #1899 [Zachary Zolton] diff --git a/activesupport/lib/active_support/vendor.rb b/activesupport/lib/active_support/vendor.rb index cee45cfea1..39da70a9f3 100644 --- a/activesupport/lib/active_support/vendor.rb +++ b/activesupport/lib/active_support/vendor.rb @@ -9,9 +9,9 @@ end require 'builder' begin - gem 'memcache-client', '>= 1.5.0.5' + gem 'memcache-client', '>= 1.6.5' rescue Gem::LoadError - $:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.5.0.5" + $:.unshift "#{File.dirname(__FILE__)}/vendor/memcache-client-1.6.5" end begin diff --git a/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb b/activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb index e90ddf3359..4d594c2089 100644 --- a/activesupport/lib/active_support/vendor/memcache-client-1.5.0.5/memcache.rb +++ b/activesupport/lib/active_support/vendor/memcache-client-1.6.5/memcache.rb @@ -1,52 +1,21 @@ -# All original code copyright 2005, 2006, 2007 Bob Cottrell, Eric Hodel, -# The Robot Co-op. All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions -# are met: -# -# 1. Redistributions of source code must retain the above copyright -# notice, this list of conditions and the following disclaimer. -# 2. Redistributions in binary form must reproduce the above copyright -# notice, this list of conditions and the following disclaimer in the -# documentation and/or other materials provided with the distribution. -# 3. Neither the names of the authors nor the names of their contributors -# may be used to endorse or promote products derived from this software -# without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE AUTHORS ``AS IS'' AND ANY EXPRESS -# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHORS OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, -# OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT -# OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR -# BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, -# WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE -# OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, -# EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - $TESTING = defined?($TESTING) && $TESTING require 'socket' require 'thread' require 'timeout' -require 'rubygems' require 'zlib' +require 'digest/sha1' ## # A Ruby client library for memcached. # -# This is intended to provide access to basic memcached functionality. It -# does not attempt to be complete implementation of the entire API, but it is -# approaching a complete implementation. class MemCache ## # The version of MemCache you are using. - VERSION = '1.5.0.5' + VERSION = '1.6.4.99' ## # Default options for the cache object. @@ -54,8 +23,10 @@ class MemCache DEFAULT_OPTIONS = { :namespace => nil, :readonly => false, - :multithread => false, - :failover => true + :multithread => true, + :failover => true, + :timeout => 0.5, + :logger => nil, } ## @@ -69,13 +40,6 @@ class MemCache DEFAULT_WEIGHT = 1 ## - # The amount of time to wait for a response from a memcached server. If a - # response is not completed within this time, the connection to the server - # will be closed and an error will be raised. - - attr_accessor :request_timeout - - ## # The namespace for this instance attr_reader :namespace @@ -91,9 +55,22 @@ class MemCache attr_reader :servers ## - # Whether this client should failover reads and writes to another server + # Socket timeout limit with this client, defaults to 0.5 sec. + # Set to nil to disable timeouts. + + attr_reader :timeout + + ## + # Should the client try to failover to another server if the + # first server is down? Defaults to true. + + attr_reader :failover + + ## + # Log debug/info/warn/error to the given Logger, defaults to nil. + + attr_reader :logger - attr_accessor :failover ## # Accepts a list of +servers+ and a list of +opts+. +servers+ may be # omitted. See +servers=+ for acceptable server list arguments. @@ -103,7 +80,11 @@ class MemCache # [:namespace] Prepends this value to all keys added or retrieved. # [:readonly] Raises an exception on cache writes when true. # [:multithread] Wraps cache access in a Mutex for thread safety. - # + # [:failover] Should the client try to failover to another server if the + # first server is down? Defaults to true. + # [:timeout] Time to use as the socket read timeout. Defaults to 0.5 sec, + # set to nil to disable timeouts (this is a major performance penalty in Ruby 1.8). + # [:logger] Logger to use for info/debug output, defaults to nil # Other options are ignored. def initialize(*args) @@ -130,9 +111,15 @@ class MemCache @namespace = opts[:namespace] @readonly = opts[:readonly] @multithread = opts[:multithread] - @failover = opts[:failover] + @timeout = opts[:timeout] + @failover = opts[:failover] + @logger = opts[:logger] @mutex = Mutex.new if @multithread - @buckets = [] + + logger.info { "memcache-client #{VERSION} #{Array(servers).inspect}" } if logger + + Thread.current[:memcache_client] = self.object_id if !@multithread + self.servers = servers end @@ -140,8 +127,8 @@ class MemCache # Returns a string representation of the cache object. def inspect - "<MemCache: %d servers, %d buckets, ns: %p, ro: %p>" % - [@servers.length, @buckets.length, @namespace, @readonly] + "<MemCache: %d servers, ns: %p, ro: %p>" % + [@servers.length, @namespace, @readonly] end ## @@ -162,7 +149,7 @@ class MemCache # Set the servers that the requests will be distributed between. Entries # can be either strings of the form "hostname:port" or # "hostname:port:weight" or MemCache::Server objects. - + # def servers=(servers) # Create the server objects. @servers = Array(servers).collect do |server| @@ -172,21 +159,17 @@ class MemCache port ||= DEFAULT_PORT weight ||= DEFAULT_WEIGHT Server.new self, host, port, weight - when Server - if server.memcache.multithread != @multithread then - raise ArgumentError, "can't mix threaded and non-threaded servers" - end - server else - raise TypeError, "cannot convert #{server.class} into MemCache::Server" + server end end - # Create an array of server buckets for weight selection of servers. - @buckets = [] - @servers.each do |server| - server.weight.times { @buckets.push(server) } - end + logger.debug { "Servers now: #{@servers.inspect}" } if logger + + # There's no point in doing this if there's only one server + @continuum = create_continuum_for(@servers) if @servers.size > 1 + + @servers end ## @@ -210,6 +193,7 @@ class MemCache def get(key, raw = false) with_server(key) do |server, cache_key| value = cache_get server, cache_key + logger.debug { "GET #{key} from #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger return nil if value.nil? value = Marshal.load value unless raw return value @@ -233,6 +217,8 @@ class MemCache # cache["a"] = 1 # cache["b"] = 2 # cache.get_multi "a", "b" # => { "a" => 1, "b" => 2 } + # + # Note that get_multi assumes the values are marshalled. def get_multi(*keys) raise MemCacheError, 'No active servers' unless active? @@ -252,15 +238,20 @@ class MemCache results = {} server_keys.each do |server, keys_for_server| - keys_for_server = keys_for_server.join ' ' - values = cache_get_multi server, keys_for_server - values.each do |key, value| - results[cache_keys[key]] = Marshal.load value + keys_for_server_str = keys_for_server.join ' ' + begin + values = cache_get_multi server, keys_for_server_str + values.each do |key, value| + results[cache_keys[key]] = Marshal.load value + end + rescue IndexError => e + # Ignore this server and try the others + logger.warn { "Unable to retrieve #{keys_for_server.size} elements from #{server.inspect}: #{e.message}"} if logger end end return results - rescue TypeError, IndexError => err + rescue TypeError => err handle_error nil, err end @@ -285,12 +276,19 @@ class MemCache # Warning: Readers should not call this method in the event of a cache miss; # see MemCache#add. + ONE_MB = 1024 * 1024 + def set(key, value, expiry = 0, raw = false) raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw - command = "set #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" + logger.debug { "SET #{key} to #{server.inspect}: #{value ? value.to_s.size : 'nil'}" } if logger + + data = value.to_s + raise MemCacheError, "Value too large, memcached can only store 1MB of data per key" if data.size > ONE_MB + + command = "set #{cache_key} 0 #{expiry} #{data.size}\r\n#{data}\r\n" with_socket_management(server) do |socket| socket.write command @@ -319,7 +317,8 @@ class MemCache raise MemCacheError, "Update of readonly cache" if @readonly with_server(key) do |server, cache_key| value = Marshal.dump value unless raw - command = "add #{cache_key} 0 #{expiry} #{value.size}\r\n#{value}\r\n" + logger.debug { "ADD #{key} to #{server}: #{value ? value.to_s.size : 'nil'}" } if logger + command = "add #{cache_key} 0 #{expiry} #{value.to_s.size}\r\n#{value}\r\n" with_socket_management(server) do |socket| socket.write command @@ -353,7 +352,6 @@ class MemCache raise MemCacheError, "Update of readonly cache" if @readonly begin - @mutex.lock if @multithread @servers.each do |server| with_socket_management(server) do |socket| socket.write "flush_all\r\n" @@ -364,8 +362,6 @@ class MemCache end rescue IndexError => err handle_error nil, err - ensure - @mutex.unlock if @multithread end end @@ -424,7 +420,7 @@ class MemCache while line = socket.gets do raise_on_error_response! line break if line == "END\r\n" - if line =~ /\ASTAT ([\w]+) ([\w\.\:]+)/ then + if line =~ /\ASTAT ([\S]+) ([\w\.\:]+)/ then name, value = $1, $2 stats[name] = case name when 'version' @@ -478,6 +474,14 @@ class MemCache end ## + # Returns an interoperable hash value for +key+. (I think, docs are + # sketchy for down servers). + + def hash_for(key) + Zlib.crc32(key) + end + + ## # Pick a server to handle the request based on a hash of the key. def get_server_for_key(key, options = {}) @@ -487,27 +491,17 @@ class MemCache raise MemCacheError, "No servers available" if @servers.empty? return @servers.first if @servers.length == 1 - hkey = hash_for key - - if @failover - 20.times do |try| - server = @buckets[hkey % @buckets.compact.size] - return server if server.alive? - hkey += hash_for "#{try}#{key}" - end - else - return @buckets[hkey % @buckets.compact.size] - end - - raise MemCacheError, "No servers available" - end + hkey = hash_for(key) - ## - # Returns an interoperable hash value for +key+. (I think, docs are - # sketchy for down servers). + 20.times do |try| + entryidx = Continuum.binary_search(@continuum, hkey) + server = @continuum[entryidx].server + return server if server.alive? + break unless failover + hkey = hash_for "#{try}#{key}" + end - def hash_for(key) - (Zlib.crc32(key) >> 16) & 0x7fff + raise MemCacheError, "No servers available" end ## @@ -608,24 +602,28 @@ class MemCache # failures (but does still apply to unexpectedly lost connections etc.). def with_socket_management(server, &block) + check_multithread_status! + @mutex.lock if @multithread retried = false - + begin socket = server.socket # Raise an IndexError to show this server is out of whack. If were inside # a with_server block, we'll catch it and attempt to restart the operation. - + raise IndexError, "No connection to server (#{server.status})" if socket.nil? - + block.call(socket) - + rescue SocketError => err - server.mark_dead(err.message) + logger.warn { "Socket failure: #{err.message}" } if logger + server.mark_dead(err) handle_error(server, err) - rescue MemCacheError, SocketError, SystemCallError, IOError => err + rescue MemCacheError, SystemCallError, IOError => err + logger.warn { "Generic failure: #{err.class.name}: #{err.message}" } if logger handle_error(server, err) if retried || socket.nil? retried = true retry @@ -640,8 +638,9 @@ class MemCache server, cache_key = request_setup(key) yield server, cache_key rescue IndexError => e + logger.warn { "Server failed: #{e.class.name}: #{e.message}" } if logger if !retried && @servers.size > 1 - puts "Connection to server #{server.inspect} DIED! Retrying operation..." + logger.info { "Connection to server #{server.inspect} DIED! Retrying operation..." } if logger retried = true retry end @@ -677,6 +676,37 @@ class MemCache end end + def create_continuum_for(servers) + total_weight = servers.inject(0) { |memo, srv| memo + srv.weight } + continuum = [] + + servers.each do |server| + entry_count_for(server, servers.size, total_weight).times do |idx| + hash = Digest::SHA1.hexdigest("#{server.host}:#{server.port}:#{idx}") + value = Integer("0x#{hash[0..7]}") + continuum << Continuum::Entry.new(value, server) + end + end + + continuum.sort { |a, b| a.value <=> b.value } + end + + def entry_count_for(server, total_servers, total_weight) + ((total_servers * Continuum::POINTS_PER_SERVER * server.weight) / Float(total_weight)).floor + end + + def check_multithread_status! + return if @multithread + + if Thread.current[:memcache_client] != self.object_id + raise MemCacheError, <<-EOM + You are accessing this memcache-client instance from multiple threads but have not enabled multithread support. + Normally: MemCache.new(['localhost:11211'], :multithread => true) + In Rails: config.cache_store = [:mem_cache_store, 'localhost:11211', { :multithread => true }] + EOM + end + end + ## # This class represents a memcached server instance. @@ -690,13 +720,6 @@ class MemCache CONNECT_TIMEOUT = 0.25 ## - # The amount of time to wait for a response from a memcached server. - # If a response isn't received within this time limit, - # the server will be marked as down. - - SOCKET_TIMEOUT = 0.5 - - ## # The amount of time to wait before attempting to re-establish a # connection with a server that is marked dead. @@ -727,6 +750,8 @@ class MemCache attr_reader :status + attr_reader :logger + ## # Create a new MemCache::Server object for the memcached instance # listening on the given host and port, weighted by the given weight. @@ -735,17 +760,15 @@ class MemCache raise ArgumentError, "No host specified" if host.nil? or host.empty? raise ArgumentError, "No port specified" if port.nil? or port.to_i.zero? - @memcache = memcache @host = host @port = port.to_i @weight = weight.to_i - @multithread = @memcache.multithread - @mutex = Mutex.new - @sock = nil @retry = nil @status = 'NOT CONNECTED' + @timeout = memcache.timeout + @logger = memcache.logger end ## @@ -770,7 +793,6 @@ class MemCache # Returns the connected socket object on success or nil on failure. def socket - @mutex.lock if @multithread return @sock if @sock and not @sock.closed? @sock = nil @@ -780,8 +802,7 @@ class MemCache # Attempt to connect if not already connected. begin - - @sock = TCPTimeoutSocket.new @host, @port + @sock = @timeout ? TCPTimeoutSocket.new(@host, @port, @timeout) : TCPSocket.new(@host, @port) if Socket.constants.include? 'TCP_NODELAY' then @sock.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1 @@ -789,12 +810,11 @@ class MemCache @retry = nil @status = 'CONNECTED' rescue SocketError, SystemCallError, IOError, Timeout::Error => err - mark_dead err.message + logger.warn { "Unable to open socket: #{err.class.name}, #{err.message}" } if logger + mark_dead err end return @sock - ensure - @mutex.unlock if @multithread end ## @@ -802,24 +822,23 @@ class MemCache # object. The server is not considered dead. def close - @mutex.lock if @multithread @sock.close if @sock && !@sock.closed? @sock = nil @retry = nil @status = "NOT CONNECTED" - ensure - @mutex.unlock if @multithread end ## # Mark the server as dead and close its socket. - def mark_dead(reason = "Unknown error") + def mark_dead(error) @sock.close if @sock && !@sock.closed? @sock = nil @retry = Time.now + RETRY_DELAY - @status = sprintf "%s:%s DEAD: %s, will retry at %s", @host, @port, reason, @retry + reason = "#{error.class.name}: #{error.message}" + @status = sprintf "%s:%s DEAD (%s), will retry at %s", @host, @port, reason, @retry + @logger.info { @status } if @logger end end @@ -833,36 +852,84 @@ end # TCPSocket facade class which implements timeouts. class TCPTimeoutSocket - def initialize(*args) + + def initialize(host, port, timeout) Timeout::timeout(MemCache::Server::CONNECT_TIMEOUT, SocketError) do - @sock = TCPSocket.new(*args) - @len = MemCache::Server::SOCKET_TIMEOUT.to_f || 0.5 + @sock = TCPSocket.new(host, port) + @len = timeout end end - + def write(*args) Timeout::timeout(@len, SocketError) do @sock.write(*args) end end - + def gets(*args) Timeout::timeout(@len, SocketError) do @sock.gets(*args) end end - + def read(*args) Timeout::timeout(@len, SocketError) do @sock.read(*args) end end - + def _socket @sock end - + def method_missing(meth, *args) @sock.__send__(meth, *args) end -end
\ No newline at end of file + + def closed? + @sock.closed? + end + + def close + @sock.close + end +end + +module Continuum + POINTS_PER_SERVER = 160 # this is the default in libmemcached + + # Find the closest index in Continuum with value <= the given value + def self.binary_search(ary, value, &block) + upper = ary.size - 1 + lower = 0 + idx = 0 + + while(lower <= upper) do + idx = (lower + upper) / 2 + comp = ary[idx].value <=> value + + if comp == 0 + return idx + elsif comp > 0 + upper = idx - 1 + else + lower = idx + 1 + end + end + return upper + end + + class Entry + attr_reader :value + attr_reader :server + + def initialize(val, srv) + @value = val + @server = srv + end + + def inspect + "<#{value}, #{server.host}:#{server.port}>" + end + end +end |