mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/protoparser: prevent from busy loop on repeated timeout errors when reading streams of ingested data
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696
This commit is contained in:
parent
41d23f84ed
commit
4beab7ad39
1 changed files with 14 additions and 0 deletions
|
@ -2,10 +2,13 @@ package common
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
// The maximum size of a single line returned by ReadLinesBlock.
|
||||
|
@ -19,6 +22,8 @@ const defaultBlockSize = 64 * 1024
|
|||
// Trailing chars after the last newline are put into tailBuf.
|
||||
//
|
||||
// Returns (dstBuf, tailBuf).
|
||||
//
|
||||
// It is expected that read timeout on r exceeds 1 second.
|
||||
func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) {
|
||||
return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize)
|
||||
}
|
||||
|
@ -30,6 +35,8 @@ func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error)
|
|||
// Returns (dstBuf, tailBuf).
|
||||
//
|
||||
// maxLineLen limits the maximum length of a single line.
|
||||
//
|
||||
// It is expected that read timeout on r exceeds 1 second.
|
||||
func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) {
|
||||
if cap(dstBuf) < defaultBlockSize {
|
||||
dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize)
|
||||
|
@ -37,6 +44,7 @@ func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]b
|
|||
dstBuf = append(dstBuf[:0], tailBuf...)
|
||||
tailBuf = tailBuf[:0]
|
||||
again:
|
||||
startTime := fasttime.UnixTimestamp()
|
||||
n, err := r.Read(dstBuf[len(dstBuf):cap(dstBuf)])
|
||||
// Check for error only if zero bytes read from r, i.e. no forward progress made.
|
||||
// Otherwise process the read data.
|
||||
|
@ -51,6 +59,12 @@ again:
|
|||
// This fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/60 .
|
||||
return dstBuf, tailBuf, nil
|
||||
}
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Timeout() && fasttime.UnixTimestamp() == startTime {
|
||||
// Prevent from busy loop when timeout erorrs are returned immediately.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 .
|
||||
return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error")
|
||||
}
|
||||
return dstBuf, tailBuf, err
|
||||
}
|
||||
dstBuf = dstBuf[:len(dstBuf)+n]
|
||||
|
|
Loading…
Reference in a new issue