diff --git a/lib/protoparser/common/lines_reader.go b/lib/protoparser/common/lines_reader.go index ab666a11c..07433d76d 100644 --- a/lib/protoparser/common/lines_reader.go +++ b/lib/protoparser/common/lines_reader.go @@ -60,10 +60,14 @@ again: return dstBuf, tailBuf, nil } var ne net.Error - if errors.As(err, &ne) && ne.Timeout() && fasttime.UnixTimestamp() == startTime { - // Prevent from busy loop when timeout erorrs are returned immediately. - // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 . - return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error") + if errors.As(err, &ne) && ne.Timeout() { + if fasttime.UnixTimestamp() == startTime { + // Prevent from busy loop when timeout erorrs are returned immediately. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/696 . + return dstBuf, tailBuf, fmt.Errorf("detected busy loop with repeated timeout error") + } + // Return empty results for an ordinary timeout. + return dstBuf, tailBuf, nil } return dstBuf, tailBuf, err } diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index cdca2a3ed..3a1f30b8a 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -1,7 +1,6 @@ package graphite import ( - "errors" "flag" "fmt" "io" @@ -54,17 +53,11 @@ func (ctx *streamContext) Read(r io.Reader) bool { } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { - var ne net.Error - if errors.As(ctx.err, &ne) && ne.Timeout() { - // Flush the read data on timeout and try reading again. - ctx.err = nil - } else { - if ctx.err != io.EOF { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err) - } - return false + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read graphite plaintext protocol data: %w", ctx.err) } + return false } ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index dffa229c2..fae719063 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -1,7 +1,6 @@ package opentsdb import ( - "errors" "flag" "fmt" "io" @@ -53,17 +52,11 @@ func (ctx *streamContext) Read(r io.Reader) bool { } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { - var ne net.Error - if errors.As(ctx.err, &ne) && ne.Timeout() { - // Flush the read data on timeout and try reading again. - ctx.err = nil - } else { - if ctx.err != io.EOF { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %w", ctx.err) - } - return false + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %w", ctx.err) } + return false } ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) diff --git a/lib/protoparser/prometheus/streamparser.go b/lib/protoparser/prometheus/streamparser.go index fc87227b6..da5c5dac9 100644 --- a/lib/protoparser/prometheus/streamparser.go +++ b/lib/protoparser/prometheus/streamparser.go @@ -1,7 +1,6 @@ package prometheus import ( - "errors" "fmt" "io" "net" @@ -54,17 +53,11 @@ func (ctx *streamContext) Read(r io.Reader) bool { } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { - var ne net.Error - if errors.As(ctx.err, &ne) && ne.Timeout() { - // Flush the read data on timeout and try reading again. - ctx.err = nil - } else { - if ctx.err != io.EOF { - readErrors.Inc() - ctx.err = fmt.Errorf("cannot read Prometheus exposition data: %w", ctx.err) - } - return false + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read Prometheus exposition data: %w", ctx.err) } + return false } ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows))