可以先看看上一篇沒Thread Pool,Limiter也好?真的嗎? 來個前情提要。
Worker Group其實基本概念跟Thread Pool很像,就是:
- 一堆Worker在跑,等著接收參數並且輸出結果
- 單一Worker擁有一個用來輸入參數的channel,以及一個用以輸出結果的channel
- Worker可被context的Done()終結,或者:
- 可被WaitGroup終結
不過由於go沒有generic(目前版本是1.16,在1.17/1.18會支援),所以這兩個channel都沒辦法寫得很漂亮,這也是大多數Worker Pool要不就是得用chan interface{}
來寫,不然就是寫不出來。不過Worker Group這種東西其實夠輕量,輕量到其實自己打造都是可以的,這邊就介紹一下怎麼自己打造一個Worker Pool,以及揭秘為什麼很多CLI/UI都需要有一個自己的UI Thread(go沒thread,所以稱為UI routine吧)。
所以我們可以假設一個流程:
- 產生20個worker thread,用一個channel把參數傳進去,用一個channel接出參數
- 把參數塞進channel
- 用WaitGroup來控制流程
那看起來的code應該大致上會像是這樣
//假設資料從csv讀入
data := GetFromCSV("laz_prd.csv", 1)
//輸入參數用的channel
c := make(chan string, len(data))
//存放結果用的channel
r := make(chan ProbeResult, len(data))
//用以控制流程的wait group
var wg sync.WaitGroup
wg.Add(len(data))
for i := 0; i < WORKER_SIZE; i++ {
//傳入該輸入用channel 輸出用channel以及waitgroup給worker
go worker(c, r, &wg)
}
//把參數灌進channel讓worker接收
for _, pid := range data {
c <- pid
}
wg.Wait()
//印出結果
for {
select {
case result := <- r:
PrintResult(r)
default: //這邊的default是必要的,不然channel r讀空了就會卡在select
break
}
}
我們可以來設計一下worker,應該大致上會長這樣:
func worker(p chan string, r chan string, wg *sync.WaitGroup) {
for {
result := &Result{}
select {
case input := <-p:
r <- CreateResultFromInput(input)
wg.Done()
}
}
}
其實這就是一個最基本的Worker Group了,有WORKER_SIZE
個Worker在處理資料。不過,這顯然有一點問題,就是:
- 如果以一個Application來講這樣是ok的,程式跑完就算還有殘留的go routine,問題也不大,因為會隨著程式結束而消滅。但是如果是一個Service的話這樣顯然會有routine leak的問題
- 這顯然是沒辦法邊跑邊印,一定得把所有東西都處理完才會一口氣印出來。
- (Optional) 沒sleep,就如同上一篇所提到的問題,會被當DDoS
第一個的話其實用context可以解決這問題,第二個的話解法很多,有一些人會選在wg.Wait()前面再開一個for + select去接,結束條件就設「接受到 len(data)
個結果」就好,不過比較通用的解法是額外開一個go routine專門處理UI問題。用context的話,整個code大概長這樣,就不另外再貼了
data := GetFromCSV("laz_prd.csv", 1)
c := make(chan string, len(data))
r := make(chan ProbeResult, len(data))
var wg sync.WaitGroup
wg.Add(len(data))
//增加一個context with cancel
//第二個參數cancel是一個func,執行他便可以讓該context傳出ctx.Done()的信號,告訴Worker說你該死了
ctx, cancel := context.WithCancel(context.Background())
for i := 0; i < WORKER_SIZE; i++ {
//傳入該context給worker
go worker(ctx, c, r, &wg)
}
for _, pid := range data {
c <- pid
}
wg.Wait()
//傳入ctx.Done()給每一個worker
cancel()
然後Worker也要實作能對ctx.Done()反應的自殺機制
//簽名要額外能收context
func worker(ctx context.Context, p chan string, r chan string, wg *sync.WaitGroup) {
for {
result := &Result{}
select {
//自殺指令,收到就會跳出回圈結束這個worker
case ctx<-Done():
break
case input := <-p:
r <- CreateResultFromInput(input)
time.Sleep(200 * time.Millisecond) //給個sleep避免request太兇
wg.Done()
}
}
}
這樣,worker就能正確被回收了。
Pingback: 沒Thread Pool,Limiter也好?真的嗎? - Fox Nest