mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
250 lines
5 KiB
Go
250 lines
5 KiB
Go
|
package fasthttp
|
||
|
|
||
|
import (
|
||
|
"net"
|
||
|
"runtime"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// workerPool serves incoming connections via a pool of workers
|
||
|
// in FILO order, i.e. the most recently stopped worker will serve the next
|
||
|
// incoming connection.
|
||
|
//
|
||
|
// Such a scheme keeps CPU caches hot (in theory).
|
||
|
type workerPool struct {
|
||
|
// Function for serving server connections.
|
||
|
// It must leave c unclosed.
|
||
|
WorkerFunc ServeHandler
|
||
|
|
||
|
MaxWorkersCount int
|
||
|
|
||
|
LogAllErrors bool
|
||
|
|
||
|
MaxIdleWorkerDuration time.Duration
|
||
|
|
||
|
Logger Logger
|
||
|
|
||
|
lock sync.Mutex
|
||
|
workersCount int
|
||
|
mustStop bool
|
||
|
|
||
|
ready []*workerChan
|
||
|
|
||
|
stopCh chan struct{}
|
||
|
|
||
|
workerChanPool sync.Pool
|
||
|
|
||
|
connState func(net.Conn, ConnState)
|
||
|
}
|
||
|
|
||
|
type workerChan struct {
|
||
|
lastUseTime time.Time
|
||
|
ch chan net.Conn
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) Start() {
|
||
|
if wp.stopCh != nil {
|
||
|
panic("BUG: workerPool already started")
|
||
|
}
|
||
|
wp.stopCh = make(chan struct{})
|
||
|
stopCh := wp.stopCh
|
||
|
wp.workerChanPool.New = func() interface{} {
|
||
|
return &workerChan{
|
||
|
ch: make(chan net.Conn, workerChanCap),
|
||
|
}
|
||
|
}
|
||
|
go func() {
|
||
|
var scratch []*workerChan
|
||
|
for {
|
||
|
wp.clean(&scratch)
|
||
|
select {
|
||
|
case <-stopCh:
|
||
|
return
|
||
|
default:
|
||
|
time.Sleep(wp.getMaxIdleWorkerDuration())
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) Stop() {
|
||
|
if wp.stopCh == nil {
|
||
|
panic("BUG: workerPool wasn't started")
|
||
|
}
|
||
|
close(wp.stopCh)
|
||
|
wp.stopCh = nil
|
||
|
|
||
|
// Stop all the workers waiting for incoming connections.
|
||
|
// Do not wait for busy workers - they will stop after
|
||
|
// serving the connection and noticing wp.mustStop = true.
|
||
|
wp.lock.Lock()
|
||
|
ready := wp.ready
|
||
|
for i := range ready {
|
||
|
ready[i].ch <- nil
|
||
|
ready[i] = nil
|
||
|
}
|
||
|
wp.ready = ready[:0]
|
||
|
wp.mustStop = true
|
||
|
wp.lock.Unlock()
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
|
||
|
if wp.MaxIdleWorkerDuration <= 0 {
|
||
|
return 10 * time.Second
|
||
|
}
|
||
|
return wp.MaxIdleWorkerDuration
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) clean(scratch *[]*workerChan) {
|
||
|
maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
|
||
|
|
||
|
// Clean least recently used workers if they didn't serve connections
|
||
|
// for more than maxIdleWorkerDuration.
|
||
|
criticalTime := time.Now().Add(-maxIdleWorkerDuration)
|
||
|
|
||
|
wp.lock.Lock()
|
||
|
ready := wp.ready
|
||
|
n := len(ready)
|
||
|
|
||
|
// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
|
||
|
l, r, mid := 0, n-1, 0
|
||
|
for l <= r {
|
||
|
mid = (l + r) / 2
|
||
|
if criticalTime.After(wp.ready[mid].lastUseTime) {
|
||
|
l = mid + 1
|
||
|
} else {
|
||
|
r = mid - 1
|
||
|
}
|
||
|
}
|
||
|
i := r
|
||
|
if i == -1 {
|
||
|
wp.lock.Unlock()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
*scratch = append((*scratch)[:0], ready[:i+1]...)
|
||
|
m := copy(ready, ready[i+1:])
|
||
|
for i = m; i < n; i++ {
|
||
|
ready[i] = nil
|
||
|
}
|
||
|
wp.ready = ready[:m]
|
||
|
wp.lock.Unlock()
|
||
|
|
||
|
// Notify obsolete workers to stop.
|
||
|
// This notification must be outside the wp.lock, since ch.ch
|
||
|
// may be blocking and may consume a lot of time if many workers
|
||
|
// are located on non-local CPUs.
|
||
|
tmp := *scratch
|
||
|
for i := range tmp {
|
||
|
tmp[i].ch <- nil
|
||
|
tmp[i] = nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) Serve(c net.Conn) bool {
|
||
|
ch := wp.getCh()
|
||
|
if ch == nil {
|
||
|
return false
|
||
|
}
|
||
|
ch.ch <- c
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
var workerChanCap = func() int {
|
||
|
// Use blocking workerChan if GOMAXPROCS=1.
|
||
|
// This immediately switches Serve to WorkerFunc, which results
|
||
|
// in higher performance (under go1.5 at least).
|
||
|
if runtime.GOMAXPROCS(0) == 1 {
|
||
|
return 0
|
||
|
}
|
||
|
|
||
|
// Use non-blocking workerChan if GOMAXPROCS>1,
|
||
|
// since otherwise the Serve caller (Acceptor) may lag accepting
|
||
|
// new connections if WorkerFunc is CPU-bound.
|
||
|
return 1
|
||
|
}()
|
||
|
|
||
|
func (wp *workerPool) getCh() *workerChan {
|
||
|
var ch *workerChan
|
||
|
createWorker := false
|
||
|
|
||
|
wp.lock.Lock()
|
||
|
ready := wp.ready
|
||
|
n := len(ready) - 1
|
||
|
if n < 0 {
|
||
|
if wp.workersCount < wp.MaxWorkersCount {
|
||
|
createWorker = true
|
||
|
wp.workersCount++
|
||
|
}
|
||
|
} else {
|
||
|
ch = ready[n]
|
||
|
ready[n] = nil
|
||
|
wp.ready = ready[:n]
|
||
|
}
|
||
|
wp.lock.Unlock()
|
||
|
|
||
|
if ch == nil {
|
||
|
if !createWorker {
|
||
|
return nil
|
||
|
}
|
||
|
vch := wp.workerChanPool.Get()
|
||
|
ch = vch.(*workerChan)
|
||
|
go func() {
|
||
|
wp.workerFunc(ch)
|
||
|
wp.workerChanPool.Put(vch)
|
||
|
}()
|
||
|
}
|
||
|
return ch
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) release(ch *workerChan) bool {
|
||
|
ch.lastUseTime = time.Now()
|
||
|
wp.lock.Lock()
|
||
|
if wp.mustStop {
|
||
|
wp.lock.Unlock()
|
||
|
return false
|
||
|
}
|
||
|
wp.ready = append(wp.ready, ch)
|
||
|
wp.lock.Unlock()
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (wp *workerPool) workerFunc(ch *workerChan) {
|
||
|
var c net.Conn
|
||
|
|
||
|
var err error
|
||
|
for c = range ch.ch {
|
||
|
if c == nil {
|
||
|
break
|
||
|
}
|
||
|
|
||
|
if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
|
||
|
errStr := err.Error()
|
||
|
if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
|
||
|
strings.Contains(errStr, "reset by peer") ||
|
||
|
strings.Contains(errStr, "request headers: small read buffer") ||
|
||
|
strings.Contains(errStr, "unexpected EOF") ||
|
||
|
strings.Contains(errStr, "i/o timeout")) {
|
||
|
wp.Logger.Printf("error when serving connection %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
|
||
|
}
|
||
|
}
|
||
|
if err == errHijacked {
|
||
|
wp.connState(c, StateHijacked)
|
||
|
} else {
|
||
|
_ = c.Close()
|
||
|
wp.connState(c, StateClosed)
|
||
|
}
|
||
|
c = nil
|
||
|
|
||
|
if !wp.release(ch) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
wp.lock.Lock()
|
||
|
wp.workersCount--
|
||
|
wp.lock.Unlock()
|
||
|
}
|