粗略阅读[Dalli的session_id生成部分][只读到一半]
def get_session(env, sid)
unless sid and session = @pool.get(sid)
sid, session = generate_sid, {}
unless @pool.add(sid, session)
raise "Session collision on '#{sid.inspect}'"
end
end
[sid, session]
end
在这里似乎创建了一个会话ID。
除非sid和session = @pool.get(sid),否则在这里检查sid的存在,如果不存在,则调用generate_sid来创建sid。
def generate_sid
loop do
sid = super
break sid unless @pool.get(sid)
end
end
用 @pool.get(sid) 将以 super 创建的 sid 存储到 memcached 中,以 sid 作为键。好像是不断循环直到消除重复和错误…?
顺便说一下,这是”super”。
def generate_sid(secure = @sid_secure)
if secure
SecureRandom.hex(@sid_length)
else
"%0#{@sid_length}x" % Kernel.rand(2**@sidbits - 1)
end
rescue NotImplementedError
generate_sid(false)
end
def get(key, options=nil)
resp = perform(:get, key)
(!resp || resp == 'Not found') ? nil : resp
end
在perform(:get, key)中,我们对key进行格式检查并将其添加到memcached中。
def perform(op, key, *args)
key = key.to_s
validate_key(key)
key = key_with_namespace(key)
begin
server = ring.server_for_key(key)
server.request(op, key, *args)
rescue NetworkError => e
Dalli.logger.debug { e.message }
Dalli.logger.debug { "retrying request with new server" }
retry
end
end
进一步阅读ring.server_for_key试试看。
ring.server_for_key(key)是什么意思?
def ring
@ring ||= Dalli::Ring.new(
Array(@servers).map do |s|
Dalli::Server.new(s, @options)
end, @options
)
end
既然提到了Dalli :: Ring.new,首先让我们关注一下Array(@servers).map。
@servers是在initialize中创建的。
def initialize(servers=nil, options={})
@servers = env_servers || servers || '127.0.0.1:11211'
@options = { :expires_in => 0 }.merge(options)
self.extend(Dalli::Client::MemcacheClientCompatibility) if Dalli::Client.compatibility_mode
@ring = nil
end
我大致可以想象,但还是先看看env_servers。
def env_servers
ENV['MEMCACHE_SERVERS'] ? ENV['MEMCACHE_SERVERS'].split(',') : nil
end
看起来是在查看配置文件。如果没有的话,就使用通过参数传递的服务器,如果连那个也没有,则使用默认的’127.0.0.1:11211’,这是Dalli::Ring.new的操作。
def initialize(servers, options)
@servers = servers
@continuum = nil
if servers.size > 1
total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
continuum = []
servers.each do |server|
entry_count_for(server, servers.size, total_weight).times do |idx|
hash = Digest::SHA1.hexdigest("#{server.hostname}:#{server.port}:#{idx}")
value = Integer("0x#{hash[0..7]}")
continuum << Dalli::Ring::Entry.new(value, server)
end
end
@continuum = continuum.sort { |a, b| a.value <=> b.value }
end
threadsafe! unless options[:threadsafe] == false
@failover = options[:failover] != false
end
因为正直读起来很困难,所以下一个。
def server_for_key(key)
if @continuum
hkey = hash_for(key)
20.times do |try|
entryidx = self.class.binary_search(@continuum, hkey)
server = @continuum[entryidx].server
return server if server.alive?
break unless @failover
hkey = hash_for("#{try}#{key}")
end
else
server = @servers.first
return server if server && server.alive?
end
raise Dalli::RingError, "No server available"
end
如果服务器存活,就返回server。这个代码是通过判断server是否存在且存活来返回值的。然而,实际上server是Dalli::Server类的一个实例,所以我们需要查看Dalli::Server#request方法。
def request(op, *args)
raise Dalli::NetworkError, "#{hostname}:#{port} is down: #{@error} #{@msg}" unless alive?
begin
send(op, *args)
rescue Dalli::NetworkError
raise
rescue Dalli::MarshalError => ex
Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}"
Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached."
Dalli.logger.error ex.backtrace.join("\n\t")
false
rescue Dalli::DalliError
raise
rescue => ex
Dalli.logger.error "Unexpected exception in Dalli: #{ex.class.name}: #{ex.message}"
Dalli.logger.error "This is a bug in Dalli, please enter an issue in Github if it does not already exist."
Dalli.logger.error ex.backtrace.join("\n\t")
down!
end
end
虽然我一直在进行救援,但我不在意。总之就是调用了 send(op, *args)。因为调用了 Object#send,所以在谈论 op 是什么。通过 ring.server_for_key 返回的 server,调用了 server.request(op, key, *args),所以只需查看 perform(op, key, *args) 中的 op 就可以了。
换言之
def generate_sid
loop do
sid = super
break sid unless @pool.get(sid)
end
end
仅需要一种选择,以下是对该句子的汉语本地化释义:
调用了 @pool.get(sid) 的部分。
def get(key, options=nil)
resp = perform(:get, key)
(!resp || resp == 'Not found') ? nil : resp
end
所以op的真正身份是:get。
也就是说,只需要看Dalli::Server#get就可以了。
def get(key)
req = [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:get])
write(req)
generic_response(true)
end
我在写。
def write(bytes)
begin
@sock.write(bytes)
rescue SystemCallError, Timeout::Error
failure!
retry
end
end
当@sock被调用ring.server_for_key时,会调用server.alive?并在其中生成。
def alive?
return true if @sock
if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now
time = @last_down_at + options[:down_retry_delay] - Time.now
Dalli.logger.debug { "down_retry_delay not reached for #{hostname}:#{port} (%.3f seconds left)" % time }
return false
end
connect
!!@sock
rescue Dalli::NetworkError
false
end
中指的是连接+的事务。
def connect
Dalli.logger.debug { "Dalli::Server#connect #{hostname}:#{port}" }
begin
if @hostname =~ /^\//
@sock = USocket.new(hostname)
elsif options[:async]
raise Dalli::DalliError, "EM support not enabled, as em-synchrony is not installed." if not defined?(AsyncSocket)
@sock = AsyncSocket.open(hostname, port, :timeout => options[:socket_timeout])
else
@sock = KSocket.open(hostname, port, :timeout => options[:socket_timeout])
end
@version = version # trigger actual connect
sasl_authentication if need_auth?
up!
rescue Dalli::DalliError # SASL auth failure
raise
rescue SystemCallError, Timeout::Error, EOFError, SocketError
# SocketError = DNS resolution failure
failure!
retry
end
end
因为在这附近读起来有些困难,所以暂时停止。