From b3c946e35a56f8d1e594e023289ebc14a408004f Mon Sep 17 00:00:00 2001 From: hanzai <hanzai@users.noreply.github.com> Date: Mon, 21 Oct 2019 04:38:51 +0800 Subject: [PATCH] warns during rows addition (#214) --- lib/storage/storage.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 5f43e01cd4..372b78144e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -774,6 +774,7 @@ var ( func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { // Return only the last error, since it has no sense in returning all errors. var lastError error + var lastWarn error var is *indexSearch var mn *MetricName @@ -797,13 +798,13 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } if mr.Timestamp < minTimestamp { // Skip rows with too small timestamps outside the retention. - lastError = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp) + lastWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp) atomic.AddUint64(&s.tooSmallTimestampRows, 1) continue } if mr.Timestamp > maxTimestamp { // Skip rows with too big timestamps significantly exceeding the current time. - lastError = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp) + lastWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp) atomic.AddUint64(&s.tooBigTimestampRows, 1) continue } @@ -833,7 +834,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - lastError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) + lastWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) j-- continue } @@ -843,7 +844,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - lastError = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) + lastWarn = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) j-- continue } @@ -863,6 +864,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra if lastError != nil { return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError) } + if lastWarn != nil { + logger.Errorf("warns occurred during rows addition: %s", lastWarn) + } return rows, nil }