Fixes error handling for promscrape.streamParse (#1009)

properly return error if client cannot read data,
properly suppress scraper errors
This commit is contained in:
Nikolay 2021-01-12 14:31:47 +03:00 committed by GitHub
parent 2c44f9989a
commit 7976c22797
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 15 additions and 8 deletions

View file

@ -31,7 +31,7 @@ func InsertHandler(req *http.Request) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}, nil)
})
}

View file

@ -31,7 +31,7 @@ func InsertHandler(req *http.Request) error {
isGzipped := req.Header.Get("Content-Encoding") == "gzip"
return parser.ParseStream(req.Body, defaultTimestamp, isGzipped, func(rows []parser.Row) error {
return insertRows(rows, extraLabels)
})
}, nil)
})
}

View file

@ -343,7 +343,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
pushDataDuration.UpdateDuration(startTime)
wc.resetNoRows()
return nil
})
}, sw.logError)
responseSize = sr.bytesRead
sr.MustClose()
}
@ -373,7 +373,7 @@ func (sw *scrapeWork) scrapeStream(scrapeTimestamp, realTimestamp int64) error {
wc.reset()
writeRequestCtxPool.Put(wc)
tsmGlobal.Update(sw.Config, sw.ScrapeGroup, up == 1, realTimestamp, int64(duration*1000), err)
return nil
return err
}
// leveledWriteRequestCtxPool allows reducing memory usage when writeRequesCtx

View file

@ -18,7 +18,7 @@ import (
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold rows after returning.
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error) error {
func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback func(rows []Row) error, errLogger func(string)) error {
if isGzipped {
zr, err := common.GetGzipReader(r)
if err != nil {
@ -31,6 +31,7 @@ func ParseStream(r io.Reader, defaultTimestamp int64, isGzipped bool, callback f
defer putStreamContext(ctx)
for ctx.Read() {
uw := getUnmarshalWork()
uw.errLogger = errLogger
uw.callback = func(rows []Row) {
if err := callback(rows); err != nil {
ctx.callbackErrLock.Lock()
@ -133,6 +134,7 @@ var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
type unmarshalWork struct {
rows Rows
callback func(rows []Row)
errLogger func(string)
defaultTimestamp int64
reqBuf []byte
}
@ -140,13 +142,18 @@ type unmarshalWork struct {
func (uw *unmarshalWork) reset() {
uw.rows.Reset()
uw.callback = nil
uw.errLogger = nil
uw.defaultTimestamp = 0
uw.reqBuf = uw.reqBuf[:0]
}
// Unmarshal implements common.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
if uw.errLogger != nil {
uw.rows.UnmarshalWithErrLogger(bytesutil.ToUnsafeString(uw.reqBuf), uw.errLogger)
} else {
uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
}
rows := uw.rows.Rows
rowsRead.Add(len(rows))

View file

@ -31,7 +31,7 @@ func TestParseStream(t *testing.T) {
}
lock.Unlock()
return nil
})
}, nil)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", s, err)
}
@ -64,7 +64,7 @@ func TestParseStream(t *testing.T) {
}
lock.Unlock()
return nil
})
}, nil)
if err != nil {
t.Fatalf("unexpected error when parsing compressed %q: %s", s, err)
}