From e2eac858b5445956890a8d2f20bed758749a0bf4 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 25 Aug 2019 15:28:32 +0300 Subject: [PATCH] lib/storage: calculate the maximum number of rows per small part from `-memory.allowedPercent` Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/159 This simplifies error detection additionally to the `vm_rows_ignored_total` counters. --- lib/storage/storage.go | 28 ++++++++++++++-------------- lib/storage/storage_test.go | 3 ++- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 9fa8d7fe3..cf4d8d14e 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -773,7 +773,9 @@ var ( ) func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { - var errors []error + // Return only the last error, since it has no sense in returning all errors. + var lastError error + var is *indexSearch var mn *MetricName var kb *bytesutil.ByteBuffer @@ -796,11 +798,13 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra } if mr.Timestamp < minTimestamp { // Skip rows with too small timestamps outside the retention. + lastError = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp) atomic.AddUint64(&s.tooSmallTimestampRows, 1) continue } if mr.Timestamp > maxTimestamp { // Skip rows with too big timestamps significantly exceeding the current time. + lastError = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp) atomic.AddUint64(&s.tooBigTimestampRows, 1) continue } @@ -830,8 +834,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - err = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) - errors = append(errors, err) + lastError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) j-- continue } @@ -841,8 +844,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra // Do not stop adding rows on error - just skip invalid row. // This guarantees that invalid rows don't prevent // from adding valid rows into the storage. - err = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) - errors = append(errors, err) + lastError = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) j-- continue } @@ -856,18 +858,16 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra rows = rows[:rowsLen+j] if err := s.tb.AddRows(rows); err != nil { - err = fmt.Errorf("cannot add rows to table: %s", err) - errors = append(errors, err) + lastError = fmt.Errorf("cannot add rows to table: %s", err) } - errors = s.updateDateMetricIDCache(rows, errors) - if len(errors) > 0 { - // Return only the first error, since it has no sense in returning all errors. - return rows, fmt.Errorf("errors occurred during rows addition: %s", errors[0]) + lastError = s.updateDateMetricIDCache(rows, lastError) + if lastError != nil { + return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError) } return rows, nil } -func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error { +func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error { var date uint64 var hour uint64 var prevTimestamp int64 @@ -909,11 +909,11 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error // by concurrent goroutines. s.dateMetricIDCache.Set(keyBuf, nil) if err := idb.storeDateMetricID(date, metricID, r.TSID.AccountID, r.TSID.ProjectID); err != nil { - errors = append(errors, err) + lastError = err continue } } - return errors + return lastError } func (s *Storage) updateCurrHourMetricIDs() { diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 958fca790..aabd4a42c 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -349,7 +349,8 @@ func testStorageRandTimestamps(s *Storage) error { mrs = append(mrs, mr) } if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { - if !strings.Contains(err.Error(), "too big timestamp") { + errStr := err.Error() + if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") { return fmt.Errorf("unexpected error when adding mrs: %s", err) } }