罗布·派克的《Go语言并发处理设计模式》后篇

这是上一篇的续篇。

让我们将上次的设计模式应用到谷歌搜索上进行讨论。

谷歌1.0

首先是一个没有并行处理的顺序处理示例。
我们先创建一个返回类型为Result的假搜索函数type Search func(query string) Result,并命名为fakeSearch函数。
假搜索函数Search通过使用time.Sleep来休眠一段时间,模拟进行搜索的情况。

package fake

import (
    "fmt"
    "math/rand"
    "time"
)

var (
    Web   = fakeSearch("web")
    Image = fakeSearch("image")
    Video = fakeSearch("video")
)

type Search func(query string) Result

func fakeSearch(kind string) Search {
    return func(query string) Result {
        time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
        return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
}

type Result string

以下是主函数。在Google函数中,我们使用顺序处理将之前的虚拟搜索函数调用了3次。

package main

import (
    "runtime"
    "time"
    "math/rand"
    "fmt"
    "github.com/tfutada/robpike/google/fake"
)

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())
    rand.Seed(time.Now().UnixNano())

    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(elapsed)
    fmt.Println(results)
}

func Google(query string) (results []fake.Result) {
    results = append(results, fake.Web(query))
    results = append(results, fake.Image(query))
    results = append(results, fake.Video(query))
    return
}

执行结果

/usr/local/go/bin/go run /Users/taka/go/src/github.com/tfutada/robpike/google/google1/main.go
160.479531ms
[web result for "golang"
 image result for "golang"
 video result for "golang"
]

Process finished with exit code 0

谷歌2.0

使用Go协程进行并发处理。使用FanIn模式的前端。将每个虚拟搜索功能包装在匿名函数中,并通过通道返回结果。

func Google(query string) (results []fake.Result) {

    c := make(chan fake.Result)
    go func() { c <- fake.Web(query) }()
    go func() { c <- fake.Image(query) }()
    go func() { c <- fake.Video(query) }()
    for i := 0; i < 3; i++ {
        results = append(results, <-c)
    }

    return
}

执行结果

这次比上次快了。最慢的搜索结果将成为整个处理时间。

/usr/local/go/bin/go run /Users/taka/go/src/github.com/tfutada/robpike/google/google2/main.go
82.139627ms
[web result for "golang"
 image result for "golang"
 video result for "golang"
]

Process finished with exit code 0

谷歌2.1

在上一次的超时模式中,当经过一定时间后,将加入终止搜索的处理。

func Google(query string) (results []fake.Result) {

    c := make(chan fake.Result)
    go func() { c <- fake.Web(query) }()
    go func() { c <- fake.Image(query) }()
    go func() { c <- fake.Video(query) }()

    timeout := time.After(80 * time.Millisecond)

    for i := 0; i < 3; i++ {
        select {
        case s := <- c:
            results = append(results, s)
        case <- timeout:
            fmt.Println("Timed out!")
            return
        }
    }

    return
}

执行结果

可以看出处理被截断于80毫秒,并且未执行图像搜索。

/usr/local/go/bin/go run /Users/taka/go/src/github.com/tfutada/robpike/google/google3/main.go
Timed out!
80.690454ms
[web result for "golang"
 video result for "golang"
]

Process finished with exit code 0

谷歌3.0

下一个调整是为了增加搜索处理的冗余性。
它会同时并行执行相同的搜索处理,并使用最快返回的结果作为正确结果。

func main() {

    runtime.GOMAXPROCS(runtime.NumCPU())
    rand.Seed(time.Now().UnixNano())

    start := time.Now()

    results := First("golang",
        fake.FakeSearch("replica 1"),
        fake.FakeSearch("replica 2"),
        fake.FakeSearch("replica 3"),
        fake.FakeSearch("replica 4"),
        fake.FakeSearch("replica 5")) // 注) fakeSearch -> FakeSearch

    elapsed := time.Since(start)
    fmt.Println(elapsed)
    fmt.Println(results)
}

func First(query string, replicas ...fake.Search) fake.Result {

    c := make(chan fake.Result)
    searchReplica := func(i int) {c <- replicas[i](query)}
    for i := range replicas {
        go searchReplica(i) // クロージャだとうまくいかない。。。
    }
    return <-c
}

// 最初のfakeSearchを大文字に変えて、メイン関数から見れるようにしました。
func FakeSearch(kind string) Search {
    // 同じ

执行结果

/usr/local/go/bin/go run /Users/taka/go/src/github.com/tfutada/robpike/google/google3/main.go
2.422057ms
replica 2 result for "golang"


Process finished with exit code 0

基于这一点,通过与之前的谷歌2.1的超时机制结合,丢弃那些耗时的副本,可以更高效地进行搜索。

bannerAds