VictoriaMetrics/vendor/github.com/VictoriaMetrics/fasthttp/lbclient.go
Aliaksandr Valialkin b6d88bac04 vendor: use github.com/VictoriaMetrics/fasthttp instead of github.com/fasthttp/fasthttp
The upstream fasthttp may contain issues like 996610f021 ,
plus a code that isn't used by VictoriaMetrics. So let's use a private copy under our control instead.
2020-04-29 16:43:09 +03:00

183 lines
4.4 KiB
Go

package fasthttp
import (
"sync"
"sync/atomic"
"time"
)
// BalancingClient is the interface for clients, which may be passed
// to LBClient.Clients.
type BalancingClient interface {
DoDeadline(req *Request, resp *Response, deadline time.Time) error
PendingRequests() int
}
// LBClient balances requests among available LBClient.Clients.
//
// It has the following features:
//
// - Balances load among available clients using 'least loaded' + 'round robin'
// hybrid technique.
// - Dynamically decreases load on unhealthy clients.
//
// It is forbidden copying LBClient instances. Create new instances instead.
//
// It is safe calling LBClient methods from concurrently running goroutines.
type LBClient struct {
noCopy noCopy
// Clients must contain non-zero clients list.
// Incoming requests are balanced among these clients.
Clients []BalancingClient
// HealthCheck is a callback called after each request.
//
// The request, response and the error returned by the client
// is passed to HealthCheck, so the callback may determine whether
// the client is healthy.
//
// Load on the current client is decreased if HealthCheck returns false.
//
// By default HealthCheck returns false if err != nil.
HealthCheck func(req *Request, resp *Response, err error) bool
// Timeout is the request timeout used when calling LBClient.Do.
//
// DefaultLBClientTimeout is used by default.
Timeout time.Duration
cs []*lbClient
// nextIdx is for spreading requests among equally loaded clients
// in a round-robin fashion.
nextIdx uint32
once sync.Once
}
// DefaultLBClientTimeout is the default request timeout used by LBClient
// when calling LBClient.Do.
//
// The timeout may be overriden via LBClient.Timeout.
const DefaultLBClientTimeout = time.Second
// DoDeadline calls DoDeadline on the least loaded client
func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
return cc.get().DoDeadline(req, resp, deadline)
}
// DoTimeout calculates deadline and calls DoDeadline on the least loaded client
func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
return cc.get().DoDeadline(req, resp, deadline)
}
// Do calls calculates deadline using LBClient.Timeout and calls DoDeadline
// on the least loaded client.
func (cc *LBClient) Do(req *Request, resp *Response) error {
timeout := cc.Timeout
if timeout <= 0 {
timeout = DefaultLBClientTimeout
}
return cc.DoTimeout(req, resp, timeout)
}
func (cc *LBClient) init() {
if len(cc.Clients) == 0 {
panic("BUG: LBClient.Clients cannot be empty")
}
for _, c := range cc.Clients {
cc.cs = append(cc.cs, &lbClient{
c: c,
healthCheck: cc.HealthCheck,
})
}
// Randomize nextIdx in order to prevent initial servers'
// hammering from a cluster of identical LBClients.
cc.nextIdx = uint32(time.Now().UnixNano())
}
func (cc *LBClient) get() *lbClient {
cc.once.Do(cc.init)
cs := cc.cs
idx := atomic.AddUint32(&cc.nextIdx, 1)
idx %= uint32(len(cs))
minC := cs[idx]
minN := minC.PendingRequests()
if minN == 0 {
return minC
}
for _, c := range cs[idx+1:] {
n := c.PendingRequests()
if n == 0 {
return c
}
if n < minN {
minC = c
minN = n
}
}
for _, c := range cs[:idx] {
n := c.PendingRequests()
if n == 0 {
return c
}
if n < minN {
minC = c
minN = n
}
}
return minC
}
type lbClient struct {
c BalancingClient
healthCheck func(req *Request, resp *Response, err error) bool
penalty uint32
}
func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
err := c.c.DoDeadline(req, resp, deadline)
if !c.isHealthy(req, resp, err) && c.incPenalty() {
// Penalize the client returning error, so the next requests
// are routed to another clients.
time.AfterFunc(penaltyDuration, c.decPenalty)
}
return err
}
func (c *lbClient) PendingRequests() int {
n := c.c.PendingRequests()
m := atomic.LoadUint32(&c.penalty)
return n + int(m)
}
func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
if c.healthCheck == nil {
return err == nil
}
return c.healthCheck(req, resp, err)
}
func (c *lbClient) incPenalty() bool {
m := atomic.AddUint32(&c.penalty, 1)
if m > maxPenalty {
c.decPenalty()
return false
}
return true
}
func (c *lbClient) decPenalty() {
atomic.AddUint32(&c.penalty, ^uint32(0))
}
const (
maxPenalty = 300
penaltyDuration = 3 * time.Second
)