一个Demo学会WorkerPool

本文转载自微信公众号「Golang来啦」,作者Seekload。转载本文请联系Golang来啦公众号。

成都创新互联公司是工信部颁发资质IDC服务器商,为用户提供优质的服务器托管服务

四哥水平有限,如有翻译或理解错误,烦请帮忙指出,感谢!

今天给大家分享一篇关于 workPool 的文章,这个平时大家应该用的比较多,一起来看下。

原文如下:

工作池是这样一个池子,会创建指定数量的 worker,这些 worker 能获取任务并处理。允许多个任务同时处理,但是需要维持固定数量的 worker 避免系统资源被过度使用。

通常有两种方式创建任务池:

  • 一种是预先创建固定数量的 worker;
  • 另外一种是当有需要的时候才会创建 worker,当然也会有数量限制;

本文将与大家一起讨论第一种方式。当我们预先知道有许多任务需要同时运行,并且很大概率会用上最大数量的 worker,通常会采用这种方式。

为了演示,我们先创建 Worker 结构体,它获取任务并执行。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  
  4. // Worker ... 
  5. type Worker struct { 
  6.  ID       int 
  7.  Name     string 
  8.  StopChan chan bool 
  9.  
  10. // Start ... 
  11. func (w *Worker) Start(jobQueue chan Job) { 
  12.  w.StopChan = make(chan bool) 
  13.  successChan := make(chan bool) 
  14.  
  15.  go func() { 
  16.   successChan <- true 
  17.   for { 
  18.    // take job 
  19.    job := <-jobQueue 
  20.    if job != nil { 
  21.     job.Start(w) 
  22.    } else { 
  23.     fmt.Printf("worker %s to be stopped\n", w.Name) 
  24.     w.StopChan <- true 
  25.     break 
  26.    } 
  27.   } 
  28.  }() 
  29.  
  30.  // wait for the worker to start 
  31.  <-successChan 
  32.  
  33. // Stop ... 
  34. func (w *Worker) Stop() { 
  35.  // wait for the worker to stop, blocking 
  36.  _ = <-w.StopChan 
  37.  fmt.Printf("worker %s stopped\n", w.Name) 

Worker 有一些属性保存当前的状态,另外还声明了两个方法分别用于启动、停止 worker。

在 Start() 方法里,创建了两个 channel 分别用于 worker 的启动和停止。最重要的是 for 循环里面,worker 会一直等待获取 job 并可执行的直到任务队列关闭。

Job 是包含单个方法 Start() 的接口,所以只要实现 Start() 方法就可以有不同类型的 job。

 
 
 
  1. // Job ... 
  2. type Job interface { 
  3.  Start(worker *Worker) error 

一旦 Worker 确定之后,接下来就是创建 pool 来管理 workers。

 
 
 
  1. import ( 
  2.  "fmt" 
  3.  "sync" 
  4.  
  5. // Pool ... 
  6. type Pool struct { 
  7.  Name string 
  8.  
  9.  Size    int 
  10.  Workers []*Worker 
  11.  
  12.  QueueSize int 
  13.  Queue     chan Job 
  14.  
  15. // Initiualize ... 
  16. func (p *Pool) Initialize() { 
  17.  // maintain minimum 1 worker 
  18.  if p.Size < 1 { 
  19.   p.Size = 1 
  20.  } 
  21.  p.Workers = []*Worker{} 
  22.  for i := 1; i <= p.Size; i++ { 
  23.   worker := &Worker{ 
  24.    ID:   i - 1, 
  25.    Name: fmt.Sprintf("%s-worker-%d", p.Name, i-1), 
  26.   } 
  27.   p.Workers = append(p.Workers, worker) 
  28.  } 
  29.  
  30.  // maintain min queue size as 1 
  31.  if p.QueueSize < 1 { 
  32.   p.QueueSize = 1 
  33.  } 
  34.  p.Queue = make(chan Job, p.QueueSize) 
  35.  
  36. // Start ... 
  37. func (p *Pool) Start() { 
  38.  for _, worker := range p.Workers { 
  39.   worker.Start(p.Queue) 
  40.  } 
  41.  fmt.Println("all workers started") 
  42.  
  43. // Stop ... 
  44. func (p *Pool) Stop() { 
  45.  close(p.Queue) // close the queue channel 
  46.  
  47.  var wg sync.WaitGroup 
  48.  for _, worker := range p.Workers { 
  49.   wg.Add(1) 
  50.   go func(w *Worker) { 
  51.    defer wg.Done() 
  52.  
  53.    w.Stop() 
  54.   }(worker) 
  55.  } 
  56.  wg.Wait() 
  57.  fmt.Println("all workers stopped") 

Pool 包含 worker 切片和用于保存 job 的队列。worker 的数量在初始化的时候是可以自定义。

关键点在 Stop() 的逻辑,当它被调用时,会先关闭 job 队列,worker 便会从 job 队列读到 nil,接着就会关闭对应的 worker。接着在 for 循环里,等待 worker 并发地停止直到最后一个 worker 停止。

为了演示整体逻辑,下面的例子展示了一个仅仅输出值的 job。

 
 
 
  1. import "fmt" 
  2.  
  3. func main() { 
  4.  pool := &Pool{ 
  5.   Name:      "test", 
  6.   Size:      5, 
  7.   QueueSize: 20, 
  8.  } 
  9.  pool.Initialize() 
  10.  pool.Start() 
  11.         defer pool.Stop() 
  12.  
  13.  for i := 1; i <= 100; i++ { 
  14.   job := &PrintJob{ 
  15.    Index: i, 
  16.   } 
  17.   pool.Queue <- job 
  18.  } 
  19.  
  20. // PrintJob ... 
  21. type PrintJob struct { 
  22.  Index int 
  23.  
  24. func (pj *PrintJob) Start(worker *Worker) error { 
  25.  fmt.Printf("job %s - %d\n", worker.Name, pj.Index) 
  26.  return nil 

如果你看了上面的代码逻辑,就会发现很简单,创建了有 5 个 worker 的工作池并且 job 队列的大小是 20。

接着,模拟 job 创建和处理过程:一旦 job 被创建就会 push 到任务队列里,等待着的 worker 便会从队列里取出 job 并处理。

类似下面这样的输出:

 
 
 
  1. all workers started 
  2. job test-worker-3 - 4 
  3. job test-worker-3 - 6 
  4. job test-worker-3 - 7 
  5. job test-worker-3 - 8 
  6. job test-worker-3 - 9 
  7. job test-worker-3 - 10 
  8. job test-worker-3 - 11 
  9. job test-worker-3 - 12 
  10. job test-worker-3 - 13 
  11. job test-worker-3 - 14 
  12. job test-worker-3 - 15 
  13. job test-worker-3 - 16 
  14. job test-worker-3 - 17 
  15. job test-worker-3 - 18 
  16. job test-worker-3 - 19 
  17. job test-worker-3 - 20 
  18. worker test-worker-3 to be stopped 
  19. job test-worker-4 - 5 
  20. job test-worker-0 - 1 
  21. worker test-worker-3 stopped 
  22. job test-worker-2 - 3 
  23. worker test-worker-2 to be stopped 
  24. worker test-worker-2 stopped 
  25. worker test-worker-4 to be stopped 
  26. worker test-worker-4 stopped 
  27. worker test-worker-0 to be stopped 
  28. worker test-worker-0 stopped 
  29. job test-worker-1 - 2 
  30. worker test-worker-1 to be stopped 
  31. worker test-worker-1 stopped 
  32. all workers stopped 

via:https://www.pixelstech.net/article/1611483826-Demo-on-creating-worker-pool-in-GoLang

作者:sonic0002

本文标题:一个Demo学会WorkerPool
标题链接:http://www.csdahua.cn/qtweb/news32/174782.html

网站建设、网络推广公司-快上网,是专注品牌与效果的网站制作,网络营销seo公司;服务项目有等

广告

声明:本网站发布的内容(图片、视频和文字)以用户投稿、用户转载内容为主,如果涉及侵权请尽快告知,我们将会在第一时间删除。文章观点不代表本网站立场,如需处理请联系客服。电话:028-86922220;邮箱:631063699@qq.com。内容未经允许不得转载,或转载时需注明来源: 快上网