メリークリスマス!

概要

非同期にログのようなものをサービスから送信するようなclientを作るとき、「サーバーをどう落とすか」というのはとても難しい問題だと思います。
特にお金がかかわるようなログだと送り漏れがないようにする必要がありますし「もう同期でいいんじゃね」と頭を抱えてしまいます。

非同期なのでバッファのような場所にデータがたまっているので、これを掃除してから落とす必要があるのが厄介の源です。

要件

clientは、あるjson型のログを別のRESTのシステムに投げるようなユースケースを想定していますが、ここではそこは端折ってio.Writerに書いています。
clientは、Sendで送られたメッセージをキャッシュし、

    • キャッシュがあるサイズに達したら

 

    • 定期的に

 

    最後にサーバーをShutdownするときに

の3つの条件でflushします。

解1:同期的に送信する

まずシンプルな解として同期的なものを用意しておきます。非同期処理はこれと同じ事を実現する必要があります。

type V1Client struct {
    w io.Writer
}

func NewV1Clienet(w io.Writer) *V1Client {
    return &V1Client{w}
}

func (c *V1Client) Send(message string) error {
    _, err := fmt.Fprintln(c.w, message)
    return err
}

これでなぜダメだったかといいますと、メッセージを束ねる必要があったためです。

解2:戦いはここから始まった

type V2Client struct {
    w    io.Writer
    q    chan string
    logs []string
    max  int
    mu   *sync.Mutex
    itvl time.Duration
}

func NewV2Client(w io.Writer, bufsize int, itvl time.Duration) *V2Client {
    return &V2Client{
        w:    w,
        logs: make([]string, bufsize)[:0],
        mu:   &sync.Mutex{},
        max:  bufsize,
        itvl: itvl,
    }
}

func (c *V2Client) Send(ctx context.Context, message string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.logs = append(c.logs, message)
    if len(c.logs) >= c.max {
        c.flush(ctx)
    }
}

func (c *V2Client) Start(ctx context.Context) {
    go func() {
        for {
            time.Sleep(c.itvl)
            c.mu.Lock()
            if err := c.flush(ctx); err != nil {
                fmt.Printf("error: %+v\n", err)
            }
            c.mu.Unlock()
            select {
            case <-ctx.Done():
                return
            }
        }
    }()
}

func (c *V2Client) flush(ctx context.Context) error {
    defer func() {
        c.logs = c.logs[:0]
    }()
    for _, m := range c.logs {
        if _, err := fmt.Fprintln(c.w, m); err != nil {
            return err
        }

    }
    return nil
}

非同期バージョンの最初の実装はこんな感じでした。ほかの言語のイメージで実装するとこんな感じになるのではないかと思います。
この実装には様々な問題点があります。

    • context cancelすることでプログラムを落とす、その時にlogsの中に残ったログの掃除をしていない

 

    • logsという配列を共有メモリのようにgoroutine間のデータのやり取りに使っている

 

    logsがオーバーフローしたとき、同期的にログを送信してしまっている

主にこの3点でしょうか。ほかにもいろいろあるとは思いますが。

解3:修正版

type V3Client struct {
    logs []*string    // log buffer
    q    chan string // channel to send log
    max  int           // max size of logs
    itvl time.Duration // log send interval
    w    io.Writer     // l2pc backup logger

    exit chan struct{}
    once *sync.Once
}


func NewV3Client(writer io.Writer, buffsize int, interval time.Duration) *V3Client {
    client := &V3Client{
        logs:         make([]string, buffsize)[:0],
        q:            make(chan string),
        max:          buffsize,
        itvl:         interval,
        w:            writer,
        once:         &sync.Once{},
    }
    return client
}

func (c *V3Client) Send(message string) {
    c.q <- message
}

func (c *V3Client) Start(ctx context.Context) {

    if c.exit != nil {
        fmt.Println("client is already started.")
        return
    }
    c.exit = make(chan struct{})

    go func() {

        t := time.NewTicker(c.itvl)
        defer t.Stop()

        for {
            select {
            case m := <-c.q:
                if err := c.tryFlush(ctx, m); err != nil {
                    fmt.Println("log send error: " + err.Error())
                }
            case <-t.C:
                if err := c.flush(ctx); err != nil {
                    fmt.Println("log send error: " + err.Error())
                }
            case <-c.exit:
                return
            }
        }
    }()
}

func (c *V3Client) Close(ctx context.Context) {
    c.once.Do(func() {
        c.exit <- struct{}{}
        if err := c.flushAll(ctx); err != nil {
            fmt.Println("log send error" + err.Error())
        }
    })

}

func (c *V3Client) tryFlush(ctx context.Context, msg string) error {
    c.logs = append(c.logs, msg)
    if len(c.logs) >= c.max {
        return c.flush(ctx)
    }
    return nil
}

func (c *V3Client) flushAll(ctx context.Context) error {
    close(c.q)
    all := c.logs
    for l := range c.q {
        all = append(all, l)
    }
    for {
        if len(all) < c.max {
            c.logs = all
            return c.flush(ctx)
        }
        c.logs, all = all[:c.max], all[c.max:]
        if err := c.flush(ctx); err != nil {
            return err
        }
    }
}

func (c *V3Client) flush(ctx context.Context) error {

    defer func() {
        c.logs = c.logs[:0]
    }()

    for _, l := range c.logs {
        if _, err := fmt.Fprintln(c.w, l); err != nil {
            return err
        }
    }
    return nil
}}

各メソッドはそれぞれ

client説明client#Startclientの非同期送信を開始する。goroutine部。client#Sendメッセージを送る(入力側)。業務ロジックが呼び出す。client$Close終了処理開始および、終了処理がおわるまで待機client#flush実送信部client#tryFlushメッセージを受け取り必要であればflushするclient#flushAll終了処理

です。#2と比較して特に違うのは「ノンブロッキング」だという事だと思います。
ほかの言語だと、こういったプログラムには必ずといっていいほどLockが顔を出します。ですが上にはありません。

    • SleepではなくTickerを使っている

 

    logsをgoroutineで共有せず、goroutine間の受け渡しにqというchanを使っている

ためです。
また、終了処理のために専用のexitというchを導入しました。なのでselect/caseで3つのchannelを待つコードになっています。

学んだこと

    1. goroutine間を渡すデータはlockして共有メモリで渡すのではなくchannelで渡す

 

    1. 一定時間置きに起動する処理はSleepを使わずTickerを使う

defer cl.Close()で終了処理を開始しつつかつ終了処理の終了まで待たせる
select/caseで複数のchannelを待つとき、同時に複数のchannelにデータが入ったときどのchannelが起動するかはrandom

func main(){
   ctx := context.Background()
   cl := NewV3Client(...)
   cl.Start(ctx)
   defer cl.Close(ctx)

   // start server with client
}

最終的なmainはこのようになりました。
割と普通なインターフェースに収まったように思います。

他の言語に慣れていると、「ノンブロッキングに無限forループを回す」というのはなかなか違和感があるのですが、確かにPythonのGILのように、過剰な範囲で足止めをしてしまい、パフォーマンスの足を引っ張る要因になりやすいのはわかる気がします。

一旦ここまでで。

bannerAds