mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-02-09 15:27:11 +00:00
app/vminsert/netstorage: dynamically adjust timeouts for sending packets from vminsert to vmstorage depending on packet size
Bigger packets will have more chances to be sent to vmstorage.
This commit is contained in:
parent
568ff61dcf
commit
694cc59ed1
1 changed files with 9 additions and 4 deletions
|
@ -107,9 +107,6 @@ func (sn *storageNode) flushBufLocked() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sn *storageNode) sendBufLocked(buf []byte) error {
|
func (sn *storageNode) sendBufLocked(buf []byte) error {
|
||||||
// sizeBuf guarantees that the rows batch will be either fully
|
|
||||||
// read or fully discarded on the vmstorage side.
|
|
||||||
// sizeBuf is used for read optimization in vmstorage.
|
|
||||||
if sn.bc == nil {
|
if sn.bc == nil {
|
||||||
if err := sn.dial(); err != nil {
|
if err := sn.dial(); err != nil {
|
||||||
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
|
return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err)
|
||||||
|
@ -118,11 +115,19 @@ func (sn *storageNode) sendBufLocked(buf []byte) error {
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
deadline := time.Now().Add(30 * time.Second)
|
timeoutSeconds := len(buf) / 1e6
|
||||||
|
if timeoutSeconds < 10 {
|
||||||
|
timeoutSeconds = 10
|
||||||
|
}
|
||||||
|
timeout := time.Duration(timeoutSeconds) * time.Second
|
||||||
|
deadline := time.Now().Add(timeout)
|
||||||
if err := sn.bc.SetWriteDeadline(deadline); err != nil {
|
if err := sn.bc.SetWriteDeadline(deadline); err != nil {
|
||||||
sn.closeBrokenConn()
|
sn.closeBrokenConn()
|
||||||
return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err)
|
return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err)
|
||||||
}
|
}
|
||||||
|
// sizeBuf guarantees that the rows batch will be either fully
|
||||||
|
// read or fully discarded on the vmstorage side.
|
||||||
|
// sizeBuf is used for read optimization in vmstorage.
|
||||||
sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf)))
|
sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf)))
|
||||||
if _, err := sn.bc.Write(sn.sizeBuf); err != nil {
|
if _, err := sn.bc.Write(sn.sizeBuf); err != nil {
|
||||||
sn.closeBrokenConn()
|
sn.closeBrokenConn()
|
||||||
|
|
Loading…
Reference in a new issue