diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 62b22ed7b8..76b89f6de8 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -107,9 +107,6 @@ func (sn *storageNode) flushBufLocked() error { } func (sn *storageNode) sendBufLocked(buf []byte) error { - // sizeBuf guarantees that the rows batch will be either fully - // read or fully discarded on the vmstorage side. - // sizeBuf is used for read optimization in vmstorage. if sn.bc == nil { if err := sn.dial(); err != nil { return fmt.Errorf("cannot dial %q: %s", sn.dialer.Addr(), err) @@ -118,11 +115,19 @@ func (sn *storageNode) sendBufLocked(buf []byte) error { if len(buf) == 0 { return nil } - deadline := time.Now().Add(30 * time.Second) + timeoutSeconds := len(buf) / 1e6 + if timeoutSeconds < 10 { + timeoutSeconds = 10 + } + timeout := time.Duration(timeoutSeconds) * time.Second + deadline := time.Now().Add(timeout) if err := sn.bc.SetWriteDeadline(deadline); err != nil { sn.closeBrokenConn() return fmt.Errorf("cannot set write deadline to %s: %s", deadline, err) } + // sizeBuf guarantees that the rows batch will be either fully + // read or fully discarded on the vmstorage side. + // sizeBuf is used for read optimization in vmstorage. sn.sizeBuf = encoding.MarshalUint64(sn.sizeBuf[:0], uint64(len(buf))) if _, err := sn.bc.Write(sn.sizeBuf); err != nil { sn.closeBrokenConn()