diff --git a/lib/storage/storage.go b/lib/storage/storage.go index fb4a99a4a..74001435b 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1101,8 +1101,8 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra prevMetricNameRaw []byte ) minTimestamp, maxTimestamp := s.tb.getMinMaxTimestamps() - // Return only the last error, since it has no sense in returning all errors. - var lastWarn error + // Return only the first error, since it has no sense in returning all errors. + var firstWarn error for i := range mrs { mr := &mrs[i] if math.IsNaN(mr.Value) { @@ -1112,13 +1112,19 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } if mr.Timestamp < minTimestamp { // Skip rows with too small timestamps outside the retention. - lastWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp) + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", + mr.Timestamp, minTimestamp) + } atomic.AddUint64(&s.tooSmallTimestampRows, 1) continue } if mr.Timestamp > maxTimestamp { // Skip rows with too big timestamps significantly exceeding the current time. - lastWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp) + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", + mr.Timestamp, maxTimestamp) + } atomic.AddUint64(&s.tooBigTimestampRows, 1) continue } @@ -1152,7 +1158,9 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - lastWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) + } j-- continue } @@ -1162,14 +1170,16 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - lastWarn = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) + if firstWarn == nil { + firstWarn = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) + } j-- continue } s.putTSIDToCache(&r.TSID, mr.MetricNameRaw) } - if lastWarn != nil { - logger.Errorf("warn occurred during rows addition: %s", lastWarn) + if firstWarn != nil { + logger.Errorf("warn occurred during rows addition: %s", firstWarn) } if is != nil { kbPool.Put(kb) @@ -1178,21 +1188,21 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } rows = rows[:rowsLen+j] - var lastError error + var firstError error if err := s.tb.AddRows(rows); err != nil { - lastError = fmt.Errorf("cannot add rows to table: %s", err) + firstError = fmt.Errorf("cannot add rows to table: %s", err) } - if err := s.updatePerDateData(rows); err != nil && lastError == nil { - lastError = fmt.Errorf("cannot update per-date data: %s", err) + if err := s.updatePerDateData(rows); err != nil && firstError == nil { + firstError = fmt.Errorf("cannot update per-date data: %s", err) } - if lastError != nil { - return rows, fmt.Errorf("error occurred during rows addition: %s", lastError) + if firstError != nil { + return rows, fmt.Errorf("error occurred during rows addition: %s", firstError) } return rows, nil } func (s *Storage) updatePerDateData(rows []rawRow) error { - var lastError error + var firstError error var date uint64 var hour uint64 var prevTimestamp int64 @@ -1226,8 +1236,10 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { // when entries for all the active time series must be added to the index. // This should address https://github.com/VictoriaMetrics/VictoriaMetrics/issues/430 . if todayShare16bit > (metricID&(1<<16-1)) && !nextDayMetricIDs.Has(metricID) { - if err := idb.storeDateMetricID(date+1, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil { - lastError = err + if err := idb.storeDateMetricID(date+1, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil { + if lastError == nil { + lastError = err + } continue } s.pendingNextDayMetricIDsLock.Lock() @@ -1261,15 +1273,17 @@ func (s *Storage) updatePerDateData(rows []rawRow) error { // Slow path: store the (date, metricID) entry in the indexDB. // It is OK if the (date, metricID) entry is added multiple times to db // by concurrent goroutines. - if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil && lastError == nil { - lastError = err + if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil { + if lastError == nil { + lastError = err + } continue } // The metric must be added to cache only after it has been successfully added to indexDB. s.dateMetricIDCache.Set(date, metricID) } - return lastError + return firstError } // dateMetricIDCache is fast cache for holding (date, metricID) entries.