How to implement and apply Go coroutine pools

In Golang, a goroutine pool can be implemented using goroutines and channels. Below is a simple example demonstrating how to use a goroutine pool to handle tasks.

package main

import (
	"fmt"
	"sync"
)

type Worker struct {
	ID     int
	Task   chan int
	Done   chan bool
	WorkerPool chan chan int
}

func NewWorker(id int, workerPool chan chan int) *Worker {
	return &Worker{
		ID:         id,
		Task:       make(chan int),
		Done:       make(chan bool),
		WorkerPool: workerPool,
	}
}

func (w *Worker) Start() {
	go func() {
		for {
			// 把自己的任务通道注册到工作池
			w.WorkerPool <- w.Task

			select {
			case task := <-w.Task:
				// 处理任务
				fmt.Printf("Worker %d processing task %d\n", w.ID, task)
			case <-w.Done:
				// 任务完成
				fmt.Printf("Worker %d stopping\n", w.ID)
				return
			}
		}
	}()
}

func (w *Worker) Stop() {
	go func() {
		w.Done <- true
	}()
}

type Pool struct {
	WorkerPool  chan chan int
	Tasks       chan int
	MaxWorkers  int
	WaitGroup   sync.WaitGroup
}

func NewPool(maxWorkers, maxTasks int) *Pool {
	return &Pool{
		WorkerPool:  make(chan chan int, maxWorkers),
		Tasks:       make(chan int, maxTasks),
		MaxWorkers:  maxWorkers,
	}
}

func (p *Pool) Start() {
	// 启动协程池中的工作协程
	for i := 0; i < p.MaxWorkers; i++ {
		worker := NewWorker(i, p.WorkerPool)
		worker.Start()
	}

	go p.dispatch()
}

func (p *Pool) dispatch() {
	for {
		select {
		case task := <-p.Tasks:
			workerTask := <-p.WorkerPool
			// 分发任务给空闲的工作协程
			workerTask <- task
		}
	}
}

func main() {
	pool := NewPool(3, 10)
	pool.Start()

	// 添加任务到任务队列
	for i := 0; i < 10; i++ {
		pool.Tasks <- i
	}

	pool.WaitGroup.Wait()
}

In the example above, we defined a Worker struct that includes a task channel Task and a completion channel Done. When a Worker is started, it registers its task channel in the worker pool and waits for tasks to arrive. Upon receiving a task from the task channel, it processes it. Once the task is completed, it notifies the main thread through the completion channel.

The Pool structure consists of a working coroutine pool and a task channel. In the Start method, we create a maximum of workers working coroutines and start them. Additionally, we also initiate a dispatch coroutine that is responsible for receiving tasks from the task channel and assigning them to idle working coroutines.

In the main function, we created a coroutine pool and added 10 tasks to the task channel. Finally, we used WaitGroup to wait for all tasks to be completed.

This is a simple implementation and application of a Golang goroutine pool. You can customize and modify it according to your needs.

bannerAds