lib/storage: calculate the maximum number of rows per small part from -memory.allowedPercent

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/159

This simplifies error detection additionally to the `vm_rows_ignored_total` counters.
This commit is contained in:
Aliaksandr Valialkin 2019-08-25 15:28:32 +03:00
parent 4b688fffee
commit 0dc0006f34
2 changed files with 16 additions and 15 deletions

View file

@ -769,7 +769,9 @@ var (
) )
func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) { func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]rawRow, error) {
var errors []error // Return only the last error, since it has no sense in returning all errors.
var lastError error
var is *indexSearch var is *indexSearch
var mn *MetricName var mn *MetricName
var kb *bytesutil.ByteBuffer var kb *bytesutil.ByteBuffer
@ -792,11 +794,13 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
} }
if mr.Timestamp < minTimestamp { if mr.Timestamp < minTimestamp {
// Skip rows with too small timestamps outside the retention. // Skip rows with too small timestamps outside the retention.
lastError = fmt.Errorf("cannot insert row with too small timestamp %d outside the retention; minimum allowed timestamp is %d", mr.Timestamp, minTimestamp)
atomic.AddUint64(&s.tooSmallTimestampRows, 1) atomic.AddUint64(&s.tooSmallTimestampRows, 1)
continue continue
} }
if mr.Timestamp > maxTimestamp { if mr.Timestamp > maxTimestamp {
// Skip rows with too big timestamps significantly exceeding the current time. // Skip rows with too big timestamps significantly exceeding the current time.
lastError = fmt.Errorf("cannot insert row with too big timestamp %d exceeding the current time; maximum allowd timestamp is %d", mr.Timestamp, maxTimestamp)
atomic.AddUint64(&s.tooBigTimestampRows, 1) atomic.AddUint64(&s.tooBigTimestampRows, 1)
continue continue
} }
@ -826,8 +830,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// Do not stop adding rows on error - just skip invalid row. // Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent // This guarantees that invalid rows don't prevent
// from adding valid rows into the storage. // from adding valid rows into the storage.
err = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err) lastError = fmt.Errorf("cannot unmarshal MetricNameRaw %q: %s", mr.MetricNameRaw, err)
errors = append(errors, err)
j-- j--
continue continue
} }
@ -837,8 +840,7 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
// Do not stop adding rows on error - just skip invalid row. // Do not stop adding rows on error - just skip invalid row.
// This guarantees that invalid rows don't prevent // This guarantees that invalid rows don't prevent
// from adding valid rows into the storage. // from adding valid rows into the storage.
err = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err) lastError = fmt.Errorf("cannot obtain TSID for MetricName %q: %s", kb.B, err)
errors = append(errors, err)
j-- j--
continue continue
} }
@ -852,18 +854,16 @@ func (s *Storage) add(rows []rawRow, mrs []MetricRow, precisionBits uint8) ([]ra
rows = rows[:rowsLen+j] rows = rows[:rowsLen+j]
if err := s.tb.AddRows(rows); err != nil { if err := s.tb.AddRows(rows); err != nil {
err = fmt.Errorf("cannot add rows to table: %s", err) lastError = fmt.Errorf("cannot add rows to table: %s", err)
errors = append(errors, err)
} }
errors = s.updateDateMetricIDCache(rows, errors) lastError = s.updateDateMetricIDCache(rows, lastError)
if len(errors) > 0 { if lastError != nil {
// Return only the first error, since it has no sense in returning all errors. return rows, fmt.Errorf("errors occurred during rows addition: %s", lastError)
return rows, fmt.Errorf("errors occurred during rows addition: %s", errors[0])
} }
return rows, nil return rows, nil
} }
func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error { func (s *Storage) updateDateMetricIDCache(rows []rawRow, lastError error) error {
var date uint64 var date uint64
var hour uint64 var hour uint64
var prevTimestamp int64 var prevTimestamp int64
@ -905,11 +905,11 @@ func (s *Storage) updateDateMetricIDCache(rows []rawRow, errors []error) []error
// by concurrent goroutines. // by concurrent goroutines.
s.dateMetricIDCache.Set(keyBuf, nil) s.dateMetricIDCache.Set(keyBuf, nil)
if err := idb.storeDateMetricID(date, metricID); err != nil { if err := idb.storeDateMetricID(date, metricID); err != nil {
errors = append(errors, err) lastError = err
continue continue
} }
} }
return errors return lastError
} }
func (s *Storage) updateCurrHourMetricIDs() { func (s *Storage) updateCurrHourMetricIDs() {

View file

@ -349,7 +349,8 @@ func testStorageRandTimestamps(s *Storage) error {
mrs = append(mrs, mr) mrs = append(mrs, mr)
} }
if err := s.AddRows(mrs, defaultPrecisionBits); err != nil { if err := s.AddRows(mrs, defaultPrecisionBits); err != nil {
if !strings.Contains(err.Error(), "too big timestamp") { errStr := err.Error()
if !strings.Contains(errStr, "too big timestamp") && !strings.Contains(errStr, "too small timestamp") {
return fmt.Errorf("unexpected error when adding mrs: %s", err) return fmt.Errorf("unexpected error when adding mrs: %s", err)
} }
} }