我创建了一个钩取GAE/Go的RPC的库称为”aespy”
@laco0416,我是一个用于介入GAE/Go的RPC并利用通信内容进行操作的库的开发者。我将在下面写一些关于这个库的工作原理以及在查看GAE/Go的RPC时需要注意的事项。
laco0416/aespy的信息,请给出一个完全用中文解释的选项。
请将以下内容用中文进行表达:
环境
无论是GAE/Go还是其他GCP服务(如Datastore、Memcache、UrlFetch等),它们都通过RPC进行通信。RPC使用一种名为Protocol Buffer(简称pb)的格式来序列化对象,这使得反序列化变得简单。然而,GAE/J已经预先准备了名为ApiProxy和Delegate的机制,可以相对容易地干预RPC。但是,GAE/Go目前还没有提供相应的API。
顺便说一下,GAE/Go的Datastore没有提供可以获取到Context的事件处理程序。尽管可以通过实现PropertyLoadSaver来介入Save和Load操作,但由于无法获取AppEngine的Context,因此可以执行的处理操作非常有限,这是一个问题。例如,无法在回调函数中自动将实体保存到Memcache中,因此通常只能在应用程序中手动执行这个过程。
一个名为goon的库可以自动将Datastore的操作转换为Memcache的操作。这个库在内部处理这些过程,非常方便。现在,如果没有goon,全部都要手动编写,实在是很痛苦。但是它只能自动缓存到Memcache,其他的事情还是需要手动编写。
今回aespyを作ったモチベーションは、「DatastoreにPutされるエンティティを自動でBigQueryにStream Insertする」ことである。goonのようにライブラリを通じてDatastoreを操作するのではなく、RPCにフックするようにしたのはgoonと併用したいというのもあるし、極力既存のアプリケーションのコードを変えずに導入したいからだ。
aespyの仕組み
かつてGAE/JにはMemvacheというライブラリがあった。これはGAE/JのApiProxyとDelegateを使ってDatastoreへのRPCを乗っ取り、自動でMemcacheの操作に置き換えるライブラリである。今回BigQueryに流し込むライブラリを作るにあたって、まずはGAE/GoにおけるApiProxyとDelegateを作る必要があった。
GAE/Go的RPC
在GAE/Go中,appengine.Context是一切的。如果获取不到Context,几乎什么都做不了;而反过来说,只要有Context,就可以做任何事情。这是因为需要调用Context.Call()来实际执行RPC。
Context是一个接口,只要满足了实现,就可以创建一个包装了appengine.Context的aespy.Context。要创建一个什么都不做的简单包装器,只需编写以下代码。我在aetest的实现中包装了Context并接管了RPC,这对我来说非常有参考意义。我之前对于是否可以在库中使用appengine_internal存在一些疑虑,但实际上进行了部署和使用后完全没有问题,应该是没问题的。
package aespy
import (
"net/http"
"appengine"
"appengine_internal"
)
// Context はappengine.Contextを継承したinterfaceである
type Context interface {
appengine.Context
}
// NewContext はappengine.ContextをラップしたContextを返す
func NewContext(req *http.Request) Context {
c := &context{Context: appengine.NewContext(req)}
return c
}
// FromContext はすでにあるappengine.Contextをラップする
func FromContext(ctx appengine.Context) Context {
c := &context{Context: ctx}
return c
}
// aespy.Contextの実装をする構造体
type context struct {
appengine.Context
}
func (c *context) Request() interface{} { return c.Context.Request() }
func (c *context) FullyQualifiedAppID() string { return c.Context.FullyQualifiedAppID() }
func (c *context) Debugf(format string, args ...interface{}) { c.Context.Debugf(format, args...) }
func (c *context) Infof(format string, args ...interface{}) { c.Context.Infof(format, args...) }
func (c *context) Warningf(format string, args ...interface{}) { c.Context.Warningf(format, args...) }
func (c *context) Errorf(format string, args ...interface{}) { c.Context.Errorf(format, args...) }
func (c *context) Criticalf(format string, args ...interface{}) { c.Context.Criticalf(format, args...) }
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
return c.Context.Call(service, method, in, out, opts)
}
我想你已经明白了
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
return c.Context.Call(service, method, in, out, opts)
}
如果在RPC调用中加入钩子处理,那么aespy就会完成。
我希望你能阅读aespy的源代码,以了解实际上是如何挂钩RPC的机制。虽然没有文件说明,但这个库并不是很庞大,如果有15分钟的时间我认为你可以完全理解它。
解释Context.Call
关于Context.Call机制的解释,我想先说明一下。当我听到RPC时,我无意识地认为它是异步处理(尽管没有特定的依据),然而这个Call方法在内部无论如何都是同步处理的行为。
对于每个形式参数,简要解释如下:
服务:字串
RPC的目标。如果是针对Datastore的RPC,则可能会使用datastore_v3等。
方法:字符串
如果是通过RPC调用的函数,如果是Datastore的Put操作,就会包含Put。
在:appengine_internal.ProtoMessage中
根据服务和方法的需求,在使用pb的最基本接口发送请求内容时,需要进行类型断言。
重要的是,在按钮前后不改变。
输出:appengine_internal.ProtoMessage
通过RPC返回的响应内容。将其作为pb的最基本接口传递,因此需根据service和method进行类型断言。
重要的是,out是一个参考,在调用之前为空,但在调用之后会有值填充进去。
选项: *appengine_internal.CallOptions
RPC的配置。它是通过结构体来表示的,以便能够清楚地识别其类型。虽然它具有设置超时等功能,但很少被钩子函数使用。
处理RPC的关键是明确了解其中的输入(in)和输出(out)。如果将RPC的前后处理分别命名为preCall和postCall,那么最基本的实现应该如下所示:
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
c.preCall(service, method, in) //この段階ではinにしか意味がない
ret := c.Context.Call(service, method, in, out, opts) //本来のRPC
c.postCall(service, method, in, out) //ここでinとout両方に意味がある
return ret //本来のRPCの結果を返す
}
向数据存储器发送RPC请求
既然已经能够窥探到所有的RPC,现在可以着手进行对Datastore的RPC的正常处理,这是最初的目标。
数据存储.放置
在调用Datastore.Put时,传递给Context.Call的参数如下:
-
- service: “datastore_v3”
method: “Put”
in: *datastore.PutRequest
out: *datastore.PutResponse
datastore.PutRequestのdatastoreは”appengine/datastore”パッケージではなく、”appengine_internal/datastore”パッケージであるのに注意。
PutRequestは「Putされるエンティティ」を内包するpbである。PutされたエンティティはPurRequest.GetEntityで取得できる。注意する点は、GetEntityはEntityProto型の スライス を返すということである。DatastoreはPutでもGetでもDeleteでも内部的には**Multiを呼んでおり、常に複数で扱っているのが理由である。
func prePut(c appengine.Context, in appengine_internal.ProtoMessage) error {
req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
for _, proto := range req.GetEntity() {
entity, err := protoToEntity(c, proto)
if err != nil {
return err
}
fmt.Printf("%+v", entity)
}
return nil
}
EntityProto型はDatastoreとやり取りするエンティティ1件を表す構造体である。エンティティは自身のキーとプロパティの情報を持っている。EntityProto型を扱いやすいよう独自のEntity型に変換するprotoToEntityを次のように実装した。
import (
"appengine"
"appengine/datastore"
pb "appengine_internal/datastore"
)
type Entity struct {
Key *datastore.Key
Kind string
Properties map[string]interface{}
}
func unwrapPropertyValue(pv *pb.PropertyValue) interface{} {
var value interface{}
switch {
case pv.BooleanValue != nil:
value = pv.GetBooleanValue()
case pv.Int64Value != nil:
value = pv.GetInt64Value()
case pv.DoubleValue != nil:
value = pv.GetDoubleValue()
case pv.StringValue != nil:
value = pv.GetStringValue()
case pv.Pointvalue != nil:
value = pv.GetPointvalue()
case pv.Referencevalue != nil:
value = pv.GetReferencevalue()
case pv.Uservalue != nil:
value = pv.GetUservalue()
default:
value = pv.String()
}
return value
}
func protoToEntity(c appengine.Context, proto *pb.EntityProto) (*Entity, error) {
entity := new(Entity)
key, err := protoToKey(c, proto.GetKey())
if err != nil {
return nil, err
}
entity.Key = key
entity.Kind = key.Kind()
entity.Properties = make(map[string]interface{})
for _, p := range proto.GetProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
return entity, nil
}
func protoToKey(c appengine.Context, proto *pb.Reference) (*datastore.Key, error) {
var key *datastore.Key
for _, e := range proto.GetPath().GetElement() {
key = datastore.NewKey(c, e.GetType(), e.GetName(), e.GetId(), key)
if _, err := key.GobEncode(); err != nil {
return nil, err
}
}
return key, nil
}
最需要注意的是datastore中的”noindex”属性处理。通过EntityProto.GetProperty仅能获取带索引的属性,而需要使用EntityProto.GetRawProperty来获取”noindex”属性。
for _, p := range proto.GetProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
Callの前のinに含まれるエンティティのキーはアプリケーション側でNewKeyもしくはNewIncompleteKeyしたままのキーである。NewIncompleteKeyの場合はキーは空であるため、実際にPutされ作成されたKeyを取得するにはCallの後のoutを見る必要がある。*datastore.PutResponseはPutされたエンティティのキーだけを返すが、inと併用することで、自身のキーを持つ完全なエンティティを復元することが可能になる。
func postPut(c appengine.Context, in, out appengine_internal.ProtoMessage) error {
req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
resp, ok := out.(*pb.PutResponse)
if !ok {
return fmt.Errorf("Not PutResponse: %#v", out)
}
for i, entityProto := range req.GetEntity() {
entity, err := protoToEntity(c, entityProto)
if err != nil {
return err
}
key, err := protoToKey(c, resp.GetKey()[i])
if err != nil {
return err
}
entity.Key = key
fmt.Printf("%+v", entity)
}
return nil
}
获取数据存储
Get和Put相反,in只有键,out包含被Get的实体。
-
- in: *pb.GetRequest
out: *pb.GetResponse
請參考代碼以了解更多詳細信息。
数据存储删除
在Delete中,in会带有一个键,但out既没有键也没有实体。
-
- in: *pb.DeleteRequest
out: *pb.DeleteResponse
烧烤
为了实现Datastore -> BigQuery的实时插入,我们开发了一个库,这是最初的目标。
烧烤
我的名字没有任何特别的含义,只是想用3、4个字母简单搞定。bbq是一个基于aespy的库,用于隐藏aespy,让用户不必意识到appengine_internal。
关于bbq的实现细节,可以看源代码,下面是一个在应用程序中使用的例子。
func handleAddNewPerson(w http.ResponseWriter, r *http.Request) {
b := bbq.NewBBQ(&bbq.Option{Log: true})
b.AddKind("Person", "aespy", "person")
c, ch := b.Hook(r)
p := &Person{Name: r.URL.Query().Get("name")}
if err := p.Save(c); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := json.Marshal(p)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := <-ch; err != nil {
c.Debugf(err.Error())
}
w.Write(resp)
}
与烧烤有关的只有前三行。
b := bbq.NewBBQ(&bbq.Option{Log: true})
b.AddKind("Person", "bbqsample", "person")
c, ch := b.Hook(r)
b.AddKind函数需要传入要插入到BigQuery的Kind名称、目标数据集ID和表格ID。添加完Kind后,可以通过c, ch := b.Hook(r)对其进行钩子处理。返回值的第一个是被aespy包装过的appengine.Context,第二个是用于接收RPC处理器返回值的通道。如果忽略这个通道,可以在插入完成之前结束与客户端的通信,并通过<-ch来等待插入处理的结束。
实际运用中,我觉得将AddKind准备在应用程序中的静态位置,然后在每个终端点上根据*http.request的情况来进行必要的Hook是一个聪明的方法。由于这个库刚刚创建,所以还需要考虑是否已经成为有用的API。
請提供中文版的問題。
由于在RPC中传输的pb(Protocol Buffer)已完全与golang分离,因此结构的信息已完全丢失。因此,在钩子(hook)中无法将需要的类似元数据的信息保存在结构的字段标签中。反过来说,由于可以处理实际传递到Datastore的实体本身,因此对于实时备份到BigQuery的要求来说,这已经是足够了。目前,没有特别考虑解决方法。
感謝詞
aespy和bbq在银周期间参加了一个连续28小时的“默默会议”(实际上更接近一个没有明确目标的黑客马拉松活动)。从星期一13点持续到星期二17点,虽然这次活动很有趣,但对体力的考验也很大,所以下次我打算带着睡袋来。我也想再次参加。