warns during rows addition (#214)

This commit is contained in:
hanzai 2019-10-21 04:38:51 +08:00 committed by Aliaksandr Valialkin
parent 6823aaaf08
commit 52778da1f3

View file

@ -778,6 +778,7 @@ var (
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
// Return only the last error, since it has no sense in returning all errors. // Return only the last error, since it has no sense in returning all errors.
var lastError error var lastError error
var lastWarn error
var is *indexSearch var is *indexSearch
var mn *MetricName var mn *MetricName
@ -801,13 +802,13 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
} }
if mr.Timestamp < minTimestamp { if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention. // Skip rows with too small timestamps outside the retention.
lastError = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp) lastWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp)
atomic.AddUint64(&s.tooSmallTimestampRows, 1) atomic.AddUint64(&s.tooSmallTimestampRows, 1)
continue continue
} }
if mr.Timestamp > maxTimestamp { if mr.Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time. // Skip rows with too big timestamps significantly exceeding the current time.
lastError = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp) lastWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp)
atomic.AddUint64(&s.tooBigTimestampRows, 1) atomic.AddUint64(&s.tooBigTimestampRows, 1)
continue continue
} }
@ -837,7 +838,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// Do not stop adding rows on error - just skip invalid row. // Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent // This guarantees that invalid rows don't prevent
// from adding valid rows into the storage. // from adding valid rows into the storage.
lastError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) lastWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
j-- j--
continue continue
} }
@ -847,7 +848,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// Do not stop adding rows on error - just skip invalid row. // Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent // This guarantees that invalid rows don't prevent
// from adding valid rows into the storage. // from adding valid rows into the storage.
lastError = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) lastWarn = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err)
j-- j--
continue continue
} }
@ -867,6 +868,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
if lastError != nil { if lastError != nil {
return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError) return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError)
} }
if lastWarn != nil {
logger.Errorf("warns occurred during rows addition: %s", lastWarn)
}
return rows, nil return rows, nil
} }