lib/storage: substitute error message about unsorted items in the index block after metricIDs merge with counter

The origin of the error has been detected and documented in the code,
so it is enough to export a counter for such errors at `vm_index_blocks_with_metric_ids_incorrect_order_total`,
so it could be monitored and alerted on high error rates.

Export also the counter for processed index blocks with metricIDs - `vm_index_blocks_with_metric_ids_processed_total`,
so its' rate could be compared to `rate(vm_index_blocks_with_metric_ids_incorrect_order_total)`.
This commit is contained in:
Aliaksandr Valialkin 2019-11-06 14:24:48 +02:00
parent c567a4353a
commit 1c777e0245
3 changed files with 43 additions and 12 deletions

View file

@ -265,6 +265,12 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_date_metric_ids_search_hits_total`, func() float64 {
return float64(idbm().DateMetricIDsSearchHits)
})
metrics.NewGauge(`vm_index_blocks_with_metric_ids_processed_total`, func() float64 {
return float64(idbm().IndexBlocksWithMetricIDsProcessed)
})
metrics.NewGauge(`vm_index_blocks_with_metric_ids_incorrect_order_total`, func() float64 {
return float64(idbm().IndexBlocksWithMetricIDsIncorrectOrder)
})
metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 {
return float64(tm().SmallAssistedMerges)

View file

@ -201,6 +201,9 @@ type IndexDBMetrics struct {
DateMetricIDsSearchCalls uint64
DateMetricIDsSearchHits uint64
IndexBlocksWithMetricIDsProcessed uint64
IndexBlocksWithMetricIDsIncorrectOrder uint64
mergeset.TableMetrics
}
@ -235,6 +238,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls)
m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits)
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
db.tb.UpdateMetrics(&m.TableMetrics)
db.doExtDB(func(extDB *indexDB) {
extDB.tb.UpdateMetrics(&m.TableMetrics)
@ -2391,8 +2397,23 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
if len(tmm.pendingMetricIDs) > 0 {
logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs)
}
if err := checkItemsSorted(dstItems); err != nil {
logger.Errorf("please report this error at https://github.com/VictoriaMetrics/VictoriaMetrics/issues : %s", err)
if !checkItemsSorted(dstItems) {
// Items could become unsorted if initial items contain duplicate metricIDs:
//
// item1: 1, 1, 5
// item2: 1, 4
//
// Items could become the following after the merge:
//
// item1: 1, 5
// item2: 1, 4
//
// i.e. item1 > item2
//
// Leave the original items unmerged, so they can be merged next time.
// This case should be quite rare - if multiple data points are simultaneously inserted
// into the same new time series from multiple concurrent goroutines.
atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1)
dstData = append(dstData[:0], tmm.dataCopy...)
dstItems = dstItems[:0]
// tmm.itemsCopy can point to overwritten data, so it must be updated
@ -2402,26 +2423,30 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) {
dstItems = append(dstItems, buf[:len(item)])
buf = buf[len(item):]
}
if err := checkItemsSorted(dstItems); err != nil {
logger.Panicf("BUG: the original items weren't sorted: %s", err)
if !checkItemsSorted(dstItems) {
logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
}
}
putTagToMetricIDsRowsMerger(tmm)
atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1)
return dstData, dstItems
}
func checkItemsSorted(items [][]byte) error {
var indexBlocksWithMetricIDsIncorrectOrder uint64
var indexBlocksWithMetricIDsProcessed uint64
func checkItemsSorted(items [][]byte) bool {
if len(items) == 0 {
return nil
return true
}
prevItem := items[0]
for _, currItem := range items[1:] {
if string(prevItem) > string(currItem) {
return fmt.Errorf("items aren't sorted: prevItem > currItem; prevItem=%X; currItem=%X; items=%X", prevItem, currItem, items)
return false
}
prevItem = currItem
}
return nil
return true
}
// maxMetricIDsPerRow limits the number of metricIDs in tag->metricIDs row.

View file

@ -25,15 +25,15 @@ func TestMergeTagToMetricIDsRows(t *testing.T) {
data = append(data, item...)
itemsB = append(itemsB, data[len(data)-len(item):])
}
if err := checkItemsSorted(itemsB); err != nil {
t.Fatalf("source items aren't sorted: %s", err)
if !checkItemsSorted(itemsB) {
t.Fatalf("source items aren't sorted; items:\n%q", itemsB)
}
resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB)
if len(resultItemsB) != len(expectedItems) {
t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems))
}
if err := checkItemsSorted(resultItemsB); err != nil {
t.Fatalf("result items aren't sorted: %s", err)
if !checkItemsSorted(resultItemsB) {
t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB)
}
for i, item := range resultItemsB {
if !bytes.HasPrefix(resultData, item) {