粗略阅读[Dalli的session_id生成部分][只读到一半]

def get_session(env, sid)
  unless sid and session = @pool.get(sid)
    sid, session = generate_sid, {}
    unless @pool.add(sid, session)
      raise "Session collision on '#{sid.inspect}'"
    end
  end
  [sid, session]
end

在这里似乎创建了一个会话ID。
除非sid和session = @pool.get(sid),否则在这里检查sid的存在,如果不存在,则调用generate_sid来创建sid。

def generate_sid
  loop do
    sid = super
    break sid unless @pool.get(sid)
  end
end

用 @pool.get(sid) 将以 super 创建的 sid 存储到 memcached 中,以 sid 作为键。好像是不断循环直到消除重复和错误…?

顺便说一下,这是”super”。

def generate_sid(secure = @sid_secure)
  if secure
    SecureRandom.hex(@sid_length)
  else
    "%0#{@sid_length}x" % Kernel.rand(2**@sidbits - 1)
  end
rescue NotImplementedError
  generate_sid(false)
end
def get(key, options=nil)
  resp = perform(:get, key)
  (!resp || resp == 'Not found') ? nil : resp
end

在perform(:get, key)中,我们对key进行格式检查并将其添加到memcached中。

def perform(op, key, *args)
  key = key.to_s
  validate_key(key)
  key = key_with_namespace(key)
  begin
    server = ring.server_for_key(key)
    server.request(op, key, *args)
  rescue NetworkError => e
    Dalli.logger.debug { e.message }
    Dalli.logger.debug { "retrying request with new server" }
    retry
  end
end

进一步阅读ring.server_for_key试试看。

ring.server_for_key(key)是什么意思?

def ring
    @ring ||= Dalli::Ring.new(
        Array(@servers).map do |s|
        Dalli::Server.new(s, @options)
    end, @options
    )
end 

既然提到了Dalli :: Ring.new,首先让我们关注一下Array(@servers).map。
@servers是在initialize中创建的。

def initialize(servers=nil, options={}) 
  @servers = env_servers || servers || '127.0.0.1:11211' 
  @options = { :expires_in => 0 }.merge(options) 
  self.extend(Dalli::Client::MemcacheClientCompatibility) if Dalli::Client.compatibility_mode 
  @ring = nil 
end 

我大致可以想象,但还是先看看env_servers。

def env_servers
  ENV['MEMCACHE_SERVERS'] ? ENV['MEMCACHE_SERVERS'].split(',') : nil
end

看起来是在查看配置文件。如果没有的话,就使用通过参数传递的服务器,如果连那个也没有,则使用默认的’127.0.0.1:11211’,这是Dalli::Ring.new的操作。

def initialize(servers, options)
  @servers = servers
  @continuum = nil
  if servers.size > 1
    total_weight = servers.inject(0) { |memo, srv| memo + srv.weight }
    continuum = []
    servers.each do |server|
      entry_count_for(server, servers.size, total_weight).times do |idx|
        hash = Digest::SHA1.hexdigest("#{server.hostname}:#{server.port}:#{idx}")
        value = Integer("0x#{hash[0..7]}")
        continuum << Dalli::Ring::Entry.new(value, server)
      end
    end
    @continuum = continuum.sort { |a, b| a.value <=> b.value }
  end

  threadsafe! unless options[:threadsafe] == false
  @failover = options[:failover] != false
end

因为正直读起来很困难,所以下一个。

def server_for_key(key)
  if @continuum
    hkey = hash_for(key)
    20.times do |try|
      entryidx = self.class.binary_search(@continuum, hkey)
      server = @continuum[entryidx].server
      return server if server.alive?
      break unless @failover
      hkey = hash_for("#{try}#{key}")
    end
  else
    server = @servers.first
    return server if server && server.alive?
  end

  raise Dalli::RingError, "No server available"
end

如果服务器存活,就返回server。这个代码是通过判断server是否存在且存活来返回值的。然而,实际上server是Dalli::Server类的一个实例,所以我们需要查看Dalli::Server#request方法。

def request(op, *args)
  raise Dalli::NetworkError, "#{hostname}:#{port} is down: #{@error} #{@msg}" unless alive?
  begin
    send(op, *args)
  rescue Dalli::NetworkError
    raise
  rescue Dalli::MarshalError => ex
    Dalli.logger.error "Marshalling error for key '#{args.first}': #{ex.message}"
    Dalli.logger.error "You are trying to cache a Ruby object which cannot be serialized to memcached."
    Dalli.logger.error ex.backtrace.join("\n\t")
    false
  rescue Dalli::DalliError
    raise
  rescue => ex
    Dalli.logger.error "Unexpected exception in Dalli: #{ex.class.name}: #{ex.message}"
    Dalli.logger.error "This is a bug in Dalli, please enter an issue in Github if it does not already exist."
    Dalli.logger.error ex.backtrace.join("\n\t")
    down!
  end
end

虽然我一直在进行救援,但我不在意。总之就是调用了 send(op, *args)。因为调用了 Object#send,所以在谈论 op 是什么。通过 ring.server_for_key 返回的 server,调用了 server.request(op, key, *args),所以只需查看 perform(op, key, *args) 中的 op 就可以了。

换言之

def generate_sid
  loop do
    sid = super
    break sid unless @pool.get(sid)
  end
end

仅需要一种选择,以下是对该句子的汉语本地化释义:
调用了 @pool.get(sid) 的部分。

def get(key, options=nil)
  resp = perform(:get, key)
  (!resp || resp == 'Not found') ? nil : resp
end

所以op的真正身份是:get。
也就是说,只需要看Dalli::Server#get就可以了。

def get(key)
  req = [REQUEST, OPCODES[:get], key.bytesize, 0, 0, 0, key.bytesize, 0, 0, key].pack(FORMAT[:get])
  write(req)
  generic_response(true)
end

我在写。

def write(bytes)
  begin
    @sock.write(bytes)
  rescue SystemCallError, Timeout::Error
    failure!
    retry
  end
end

当@sock被调用ring.server_for_key时,会调用server.alive?并在其中生成。

def alive?
  return true if @sock

  if @last_down_at && @last_down_at + options[:down_retry_delay] >= Time.now
    time = @last_down_at + options[:down_retry_delay] - Time.now
    Dalli.logger.debug { "down_retry_delay not reached for #{hostname}:#{port} (%.3f seconds left)" % time }
    return false
  end

  connect
  !!@sock
rescue Dalli::NetworkError
  false
end

中指的是连接+的事务。

def connect
  Dalli.logger.debug { "Dalli::Server#connect #{hostname}:#{port}" }

  begin
    if @hostname =~ /^\//
      @sock = USocket.new(hostname)
    elsif options[:async]
      raise Dalli::DalliError, "EM support not enabled, as em-synchrony is not installed." if not defined?(AsyncSocket)
      @sock = AsyncSocket.open(hostname, port, :timeout => options[:socket_timeout])
    else
      @sock = KSocket.open(hostname, port, :timeout => options[:socket_timeout])
    end
    @version = version # trigger actual connect
    sasl_authentication if need_auth?
    up!
  rescue Dalli::DalliError # SASL auth failure
    raise
  rescue SystemCallError, Timeout::Error, EOFError, SocketError
    # SocketError = DNS resolution failure
    failure!
    retry
  end
end

因为在这附近读起来有些困难,所以暂时停止。

广告
将在 10 秒后关闭
bannerAds