我创建了一个钩取GAE/Go的RPC的库称为”aespy”

@laco0416,我是一个用于介入GAE/Go的RPC并利用通信内容进行操作的库的开发者。我将在下面写一些关于这个库的工作原理以及在查看GAE/Go的RPC时需要注意的事项。

laco0416/aespy的信息,请给出一个完全用中文解释的选项。

请将以下内容用中文进行表达:

环境

无论是GAE/Go还是其他GCP服务(如Datastore、Memcache、UrlFetch等),它们都通过RPC进行通信。RPC使用一种名为Protocol Buffer(简称pb)的格式来序列化对象,这使得反序列化变得简单。然而,GAE/J已经预先准备了名为ApiProxy和Delegate的机制,可以相对容易地干预RPC。但是,GAE/Go目前还没有提供相应的API。

顺便说一下,GAE/Go的Datastore没有提供可以获取到Context的事件处理程序。尽管可以通过实现PropertyLoadSaver来介入Save和Load操作,但由于无法获取AppEngine的Context,因此可以执行的处理操作非常有限,这是一个问题。例如,无法在回调函数中自动将实体保存到Memcache中,因此通常只能在应用程序中手动执行这个过程。

一个名为goon的库可以自动将Datastore的操作转换为Memcache的操作。这个库在内部处理这些过程,非常方便。现在,如果没有goon,全部都要手动编写,实在是很痛苦。但是它只能自动缓存到Memcache,其他的事情还是需要手动编写。

今回aespyを作ったモチベーションは、「DatastoreにPutされるエンティティを自動でBigQueryにStream Insertする」ことである。goonのようにライブラリを通じてDatastoreを操作するのではなく、RPCにフックするようにしたのはgoonと併用したいというのもあるし、極力既存のアプリケーションのコードを変えずに導入したいからだ。

aespyの仕組み

かつてGAE/JにはMemvacheというライブラリがあった。これはGAE/JのApiProxyとDelegateを使ってDatastoreへのRPCを乗っ取り、自動でMemcacheの操作に置き換えるライブラリである。今回BigQueryに流し込むライブラリを作るにあたって、まずはGAE/GoにおけるApiProxyとDelegateを作る必要があった。

GAE/Go的RPC

在GAE/Go中,appengine.Context是一切的。如果获取不到Context,几乎什么都做不了;而反过来说,只要有Context,就可以做任何事情。这是因为需要调用Context.Call()来实际执行RPC。

Context是一个接口,只要满足了实现,就可以创建一个包装了appengine.Context的aespy.Context。要创建一个什么都不做的简单包装器,只需编写以下代码。我在aetest的实现中包装了Context并接管了RPC,这对我来说非常有参考意义。我之前对于是否可以在库中使用appengine_internal存在一些疑虑,但实际上进行了部署和使用后完全没有问题,应该是没问题的。

package aespy

import (
    "net/http"

    "appengine"
    "appengine_internal"
)

// Context はappengine.Contextを継承したinterfaceである
type Context interface {
    appengine.Context
}

// NewContext はappengine.ContextをラップしたContextを返す
func NewContext(req *http.Request) Context {
    c := &context{Context: appengine.NewContext(req)}
    return c
}

// FromContext はすでにあるappengine.Contextをラップする
func FromContext(ctx appengine.Context) Context {
    c := &context{Context: ctx}
    return c
}

// aespy.Contextの実装をする構造体
type context struct {
    appengine.Context
}

func (c *context) Request() interface{}                         { return c.Context.Request() }
func (c *context) FullyQualifiedAppID() string                  { return c.Context.FullyQualifiedAppID() }
func (c *context) Debugf(format string, args ...interface{})    { c.Context.Debugf(format, args...) }
func (c *context) Infof(format string, args ...interface{})     { c.Context.Infof(format, args...) }
func (c *context) Warningf(format string, args ...interface{})  { c.Context.Warningf(format, args...) }
func (c *context) Errorf(format string, args ...interface{})    { c.Context.Errorf(format, args...) }
func (c *context) Criticalf(format string, args ...interface{}) { c.Context.Criticalf(format, args...) }

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
    return c.Context.Call(service, method, in, out, opts)
}

我想你已经明白了

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
    return c.Context.Call(service, method, in, out, opts)
}

如果在RPC调用中加入钩子处理,那么aespy就会完成。

我希望你能阅读aespy的源代码,以了解实际上是如何挂钩RPC的机制。虽然没有文件说明,但这个库并不是很庞大,如果有15分钟的时间我认为你可以完全理解它。

解释Context.Call

关于Context.Call机制的解释,我想先说明一下。当我听到RPC时,我无意识地认为它是异步处理(尽管没有特定的依据),然而这个Call方法在内部无论如何都是同步处理的行为。

对于每个形式参数,简要解释如下:

服务:字串

RPC的目标。如果是针对Datastore的RPC,则可能会使用datastore_v3等。

方法:字符串

如果是通过RPC调用的函数,如果是Datastore的Put操作,就会包含Put。

在:appengine_internal.ProtoMessage中

根据服务和方法的需求,在使用pb的最基本接口发送请求内容时,需要进行类型断言。

重要的是,在按钮前后不改变。

输出:appengine_internal.ProtoMessage

通过RPC返回的响应内容。将其作为pb的最基本接口传递,因此需根据service和method进行类型断言。

重要的是,out是一个参考,在调用之前为空,但在调用之后会有值填充进去。

选项: *appengine_internal.CallOptions

RPC的配置。它是通过结构体来表示的,以便能够清楚地识别其类型。虽然它具有设置超时等功能,但很少被钩子函数使用。

处理RPC的关键是明确了解其中的输入(in)和输出(out)。如果将RPC的前后处理分别命名为preCall和postCall,那么最基本的实现应该如下所示:

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
    c.preCall(service, method, in) //この段階ではinにしか意味がない
    ret := c.Context.Call(service, method, in, out, opts) //本来のRPC
    c.postCall(service, method, in, out) //ここでinとout両方に意味がある
    return ret //本来のRPCの結果を返す
}

向数据存储器发送RPC请求

既然已经能够窥探到所有的RPC,现在可以着手进行对Datastore的RPC的正常处理,这是最初的目标。

数据存储.放置

在调用Datastore.Put时,传递给Context.Call的参数如下:

    • service: “datastore_v3”

method: “Put”

in: *datastore.PutRequest

out: *datastore.PutResponse

datastore.PutRequestのdatastoreは”appengine/datastore”パッケージではなく、”appengine_internal/datastore”パッケージであるのに注意。

PutRequestは「Putされるエンティティ」を内包するpbである。PutされたエンティティはPurRequest.GetEntityで取得できる。注意する点は、GetEntityはEntityProto型の スライス を返すということである。DatastoreはPutでもGetでもDeleteでも内部的には**Multiを呼んでおり、常に複数で扱っているのが理由である。

func prePut(c appengine.Context, in appengine_internal.ProtoMessage) error {
    req, ok := in.(*pb.PutRequest)
    if !ok {
        return fmt.Errorf("Not PutRequest: %#v", in)
    }
    for _, proto := range req.GetEntity() {
        entity, err := protoToEntity(c, proto)
        if err != nil {
            return err
        }
        fmt.Printf("%+v", entity)
    }
    return nil
}

EntityProto型はDatastoreとやり取りするエンティティ1件を表す構造体である。エンティティは自身のキーとプロパティの情報を持っている。EntityProto型を扱いやすいよう独自のEntity型に変換するprotoToEntityを次のように実装した。

import (
    "appengine"
    "appengine/datastore"
    pb "appengine_internal/datastore"
)

type Entity struct {
    Key        *datastore.Key         
    Kind       string                 
    Properties map[string]interface{}
}

func unwrapPropertyValue(pv *pb.PropertyValue) interface{} {
    var value interface{}
    switch {
    case pv.BooleanValue != nil:
        value = pv.GetBooleanValue()
    case pv.Int64Value != nil:
        value = pv.GetInt64Value()
    case pv.DoubleValue != nil:
        value = pv.GetDoubleValue()
    case pv.StringValue != nil:
        value = pv.GetStringValue()
    case pv.Pointvalue != nil:
        value = pv.GetPointvalue()
    case pv.Referencevalue != nil:
        value = pv.GetReferencevalue()
    case pv.Uservalue != nil:
        value = pv.GetUservalue()
    default:
        value = pv.String()
    }
    return value
}

func protoToEntity(c appengine.Context, proto *pb.EntityProto) (*Entity, error) {
    entity := new(Entity)
    key, err := protoToKey(c, proto.GetKey())
    if err != nil {
        return nil, err
    }
    entity.Key = key
    entity.Kind = key.Kind()
    entity.Properties = make(map[string]interface{})
    for _, p := range proto.GetProperty() {
        entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
    }
    // no index field
    for _, p := range proto.GetRawProperty() {
        entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
    }
    return entity, nil
}

func protoToKey(c appengine.Context, proto *pb.Reference) (*datastore.Key, error) {
    var key *datastore.Key
    for _, e := range proto.GetPath().GetElement() {
        key = datastore.NewKey(c, e.GetType(), e.GetName(), e.GetId(), key)
        if _, err := key.GobEncode(); err != nil {
            return nil, err
        }
    }
    return key, nil
}

最需要注意的是datastore中的”noindex”属性处理。通过EntityProto.GetProperty仅能获取带索引的属性,而需要使用EntityProto.GetRawProperty来获取”noindex”属性。

    for _, p := range proto.GetProperty() {
        entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
    }
    // no index field
    for _, p := range proto.GetRawProperty() {
        entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
    }

Callの前のinに含まれるエンティティのキーはアプリケーション側でNewKeyもしくはNewIncompleteKeyしたままのキーである。NewIncompleteKeyの場合はキーは空であるため、実際にPutされ作成されたKeyを取得するにはCallの後のoutを見る必要がある。*datastore.PutResponseはPutされたエンティティのキーだけを返すが、inと併用することで、自身のキーを持つ完全なエンティティを復元することが可能になる。

func postPut(c appengine.Context, in, out appengine_internal.ProtoMessage) error {
    req, ok := in.(*pb.PutRequest)
    if !ok {
        return fmt.Errorf("Not PutRequest: %#v", in)
    }
    resp, ok := out.(*pb.PutResponse)
    if !ok {
        return fmt.Errorf("Not PutResponse: %#v", out)
    }
    for i, entityProto := range req.GetEntity() {
        entity, err := protoToEntity(c, entityProto)
        if err != nil {
            return err
        }
        key, err := protoToKey(c, resp.GetKey()[i])
        if err != nil {
            return err
        }
        entity.Key = key
        fmt.Printf("%+v", entity)
    }
    return nil
}

获取数据存储

Get和Put相反,in只有键,out包含被Get的实体。

    • in: *pb.GetRequest

out: *pb.GetResponse

請參考代碼以了解更多詳細信息。

数据存储删除

在Delete中,in会带有一个键,但out既没有键也没有实体。

    • in: *pb.DeleteRequest

out: *pb.DeleteResponse

烧烤

为了实现Datastore -> BigQuery的实时插入,我们开发了一个库,这是最初的目标。

烧烤

我的名字没有任何特别的含义,只是想用3、4个字母简单搞定。bbq是一个基于aespy的库,用于隐藏aespy,让用户不必意识到appengine_internal。
关于bbq的实现细节,可以看源代码,下面是一个在应用程序中使用的例子。

func handleAddNewPerson(w http.ResponseWriter, r *http.Request) {
    b := bbq.NewBBQ(&bbq.Option{Log: true})
    b.AddKind("Person", "aespy", "person")
    c, ch := b.Hook(r)
    p := &Person{Name: r.URL.Query().Get("name")}
    if err := p.Save(c); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    resp, err := json.Marshal(p)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if err := <-ch; err != nil {
        c.Debugf(err.Error())
    }
    w.Write(resp)
}

与烧烤有关的只有前三行。

    b := bbq.NewBBQ(&bbq.Option{Log: true})
    b.AddKind("Person", "bbqsample", "person")
    c, ch := b.Hook(r)

b.AddKind函数需要传入要插入到BigQuery的Kind名称、目标数据集ID和表格ID。添加完Kind后,可以通过c, ch := b.Hook(r)对其进行钩子处理。返回值的第一个是被aespy包装过的appengine.Context,第二个是用于接收RPC处理器返回值的通道。如果忽略这个通道,可以在插入完成之前结束与客户端的通信,并通过<-ch来等待插入处理的结束。

实际运用中,我觉得将AddKind准备在应用程序中的静态位置,然后在每个终端点上根据*http.request的情况来进行必要的Hook是一个聪明的方法。由于这个库刚刚创建,所以还需要考虑是否已经成为有用的API。

請提供中文版的問題。

由于在RPC中传输的pb(Protocol Buffer)已完全与golang分离,因此结构的信息已完全丢失。因此,在钩子(hook)中无法将需要的类似元数据的信息保存在结构的字段标签中。反过来说,由于可以处理实际传递到Datastore的实体本身,因此对于实时备份到BigQuery的要求来说,这已经是足够了。目前,没有特别考虑解决方法。

感謝詞

aespy和bbq在银周期间参加了一个连续28小时的“默默会议”(实际上更接近一个没有明确目标的黑客马拉松活动)。从星期一13点持续到星期二17点,虽然这次活动很有趣,但对体力的考验也很大,所以下次我打算带着睡袋来。我也想再次参加。

bannerAds