Go言語による並列処理のデータ処理パイプライン

Go言語で並行性の高いデータ処理パイプラインを実現するには、WaitGroupを利用できます。

WaitGroupとは、複数の並列処理が完了するのを待機するカウント型セマフォ。Addメソッドでカウンタの値を増やし、Doneメソッドで減らし、Waitメソッドでカウンタが0になるまでブロックする。

WaitGroupを使用した高頻度データ処理パイプラインの例を次に示します。

package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
numWorkers := 10
dataChan := make(chan int, 100)
// 第一阶段,生成数据
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
dataChan <- i
}
close(dataChan)
}()
// 第二阶段,处理数据
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataChan {
result := processData(data)
fmt.Println(result)
}
}()
}
wg.Wait()
}
func processData(data int) int {
// 此处模拟数据处理过程
return data * 2
}

このサンプルでは、WaitGroupオブジェクト `wg` とデータの段階間でのやりとりに用いられるバッファ付きチャネル `dataChan` が最初に作成されています。

第1段階では、データを生成するgoroutineを起動し、データをdataChanチャネルに送信します。データの送信が完了したら、close(dataChan)を呼び出してチャネルを閉じます。

第2段階では、複数のgoroutineを起動して同時にデータを処理します。 各goroutineはdataChanからデータを受信し、processData関数を呼び出してデータを処理し、処理結果を出力します。

最後に、すべてのgoroutineの完了をwg.Wait()の呼び出しで待つ

データ生成フェーズとデータ処理フェーズ間ではチャネルを利用してデータを渡し、すべてのgoroutineが完了するのをWaitGroupで待ちます。

bannerAds