mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
app/vmselect: prevent from closing connection to vmstorage on query timeout by setting +2 secs deadline on connection comparing to query deadline
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/711
This commit is contained in:
parent
eee6f1e56d
commit
a9205fe308
2 changed files with 16 additions and 14 deletions
|
@ -1316,12 +1316,21 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
||||||
<-sn.concurrentQueriesCh
|
<-sn.concurrentQueriesCh
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
d := time.Unix(int64(deadline.Deadline()), 0)
|
||||||
|
nowSecs := fasttime.UnixTimestamp()
|
||||||
|
currentTime := time.Unix(int64(nowSecs), 0)
|
||||||
|
timeout := d.Sub(currentTime)
|
||||||
|
if timeout <= 0 {
|
||||||
|
return fmt.Errorf("request timeout reached: %s", deadline.String())
|
||||||
|
}
|
||||||
bc, err := sn.connPool.Get()
|
bc, err := sn.connPool.Get()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
return fmt.Errorf("cannot obtain connection from a pool: %w", err)
|
||||||
}
|
}
|
||||||
d := time.Unix(int64(deadline.Deadline()), 0)
|
// Extend the connection deadline by 2 seconds, so the remote storage could return `timeout` error
|
||||||
if err := bc.SetDeadline(d); err != nil {
|
// without the need to break the connection.
|
||||||
|
connDeadline := d.Add(2 * time.Second)
|
||||||
|
if err := bc.SetDeadline(connDeadline); err != nil {
|
||||||
_ = bc.Close()
|
_ = bc.Close()
|
||||||
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
logger.Panicf("FATAL: cannot set connection deadline: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -1333,16 +1342,8 @@ func (sn *storageNode) execOnConn(rpcName string, f func(bc *handshake.BufferedC
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
|
// Send the remaining timeout instead of deadline to remote server, since it may have different time.
|
||||||
now := fasttime.UnixTimestamp()
|
timeoutSecs := uint32(timeout.Seconds() + 1)
|
||||||
timeout := uint64(0)
|
if err := writeUint32(bc, timeoutSecs); err != nil {
|
||||||
if deadline.Deadline() > now {
|
|
||||||
timeout = deadline.Deadline() - now
|
|
||||||
}
|
|
||||||
if timeout > (1<<32)-2 {
|
|
||||||
timeout = (1 << 32) - 2
|
|
||||||
}
|
|
||||||
timeout++
|
|
||||||
if err := writeUint32(bc, uint32(timeout)); err != nil {
|
|
||||||
// Close the connection instead of returning it to the pool,
|
// Close the connection instead of returning it to the pool,
|
||||||
// since it may be broken.
|
// since it may be broken.
|
||||||
_ = bc.Close()
|
_ = bc.Close()
|
||||||
|
|
|
@ -15,8 +15,9 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
|
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
|
||||||
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for search query execution")
|
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
|
||||||
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses when some of vmstorage nodes are unavailable. This trades consistency over availability")
|
denyPartialResponse = flag.Bool("search.denyPartialResponse", false, "Whether to deny partial responses if a part of -storageNode instances fail to perform queries; "+
|
||||||
|
"this trades availability over consistency; see also -search.maxQueryDuration")
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetTime returns time from the given argKey query arg.
|
// GetTime returns time from the given argKey query arg.
|
||||||
|
|
Loading…
Reference in a new issue