From 1c777e02456af484011727bf5e569966fe8c2f49 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Wed, 6 Nov 2019 14:24:48 +0200 Subject: [PATCH] lib/storage: substitute error message about unsorted items in the index block after metricIDs merge with counter The origin of the error has been detected and documented in the code, so it is enough to export a counter for such errors at `vm_index_blocks_with_metric_ids_incorrect_order_total`, so it could be monitored and alerted on high error rates. Export also the counter for processed index blocks with metricIDs - `vm_index_blocks_with_metric_ids_processed_total`, so its' rate could be compared to `rate(vm_index_blocks_with_metric_ids_incorrect_order_total)`. --- app/vmstorage/main.go | 6 ++++++ lib/storage/index_db.go | 41 +++++++++++++++++++++++++++++------- lib/storage/index_db_test.go | 8 +++---- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 53af4c0e9..0bc3b19f8 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -265,6 +265,12 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_date_metric_ids_search_hits_total`, func() float64 { return float64(idbm().DateMetricIDsSearchHits) }) + metrics.NewGauge(`vm_index_blocks_with_metric_ids_processed_total`, func() float64 { + return float64(idbm().IndexBlocksWithMetricIDsProcessed) + }) + metrics.NewGauge(`vm_index_blocks_with_metric_ids_incorrect_order_total`, func() float64 { + return float64(idbm().IndexBlocksWithMetricIDsIncorrectOrder) + }) metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 { return float64(tm().SmallAssistedMerges) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d5f558845..8571e3640 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -201,6 +201,9 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 + IndexBlocksWithMetricIDsProcessed uint64 + IndexBlocksWithMetricIDsIncorrectOrder uint64 + mergeset.TableMetrics } @@ -235,6 +238,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) + m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) + db.tb.UpdateMetrics(&m.TableMetrics) db.doExtDB(func(extDB *indexDB) { extDB.tb.UpdateMetrics(&m.TableMetrics) @@ -2391,8 +2397,23 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { if len(tmm.pendingMetricIDs) > 0 { logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs) } - if err := checkItemsSorted(dstItems); err != nil { - logger.Errorf("please report this error at https://github.com/VictoriaMetrics/VictoriaMetrics/issues : %s", err) + if !checkItemsSorted(dstItems) { + // Items could become unsorted if initial items contain duplicate metricIDs: + // + // item1: 1, 1, 5 + // item2: 1, 4 + // + // Items could become the following after the merge: + // + // item1: 1, 5 + // item2: 1, 4 + // + // i.e. item1 > item2 + // + // Leave the original items unmerged, so they can be merged next time. + // This case should be quite rare - if multiple data points are simultaneously inserted + // into the same new time series from multiple concurrent goroutines. + atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1) dstData = append(dstData[:0], tmm.dataCopy...) dstItems = dstItems[:0] // tmm.itemsCopy can point to overwritten data, so it must be updated @@ -2402,26 +2423,30 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { dstItems = append(dstItems, buf[:len(item)]) buf = buf[len(item):] } - if err := checkItemsSorted(dstItems); err != nil { - logger.Panicf("BUG: the original items weren't sorted: %s", err) + if !checkItemsSorted(dstItems) { + logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems) } } putTagToMetricIDsRowsMerger(tmm) + atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1) return dstData, dstItems } -func checkItemsSorted(items [][]byte) error { +var indexBlocksWithMetricIDsIncorrectOrder uint64 +var indexBlocksWithMetricIDsProcessed uint64 + +func checkItemsSorted(items [][]byte) bool { if len(items) == 0 { - return nil + return true } prevItem := items[0] for _, currItem := range items[1:] { if string(prevItem) > string(currItem) { - return fmt.Errorf("items aren't sorted: prevItem > currItem; prevItem=%X; currItem=%X; items=%X", prevItem, currItem, items) + return false } prevItem = currItem } - return nil + return true } // maxMetricIDsPerRow limits the number of metricIDs in tag->metricIDs row. diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 0bd00180b..ea603cdbe 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -25,15 +25,15 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { data = append(data, item...) itemsB = append(itemsB, data[len(data)-len(item):]) } - if err := checkItemsSorted(itemsB); err != nil { - t.Fatalf("source items aren't sorted: %s", err) + if !checkItemsSorted(itemsB) { + t.Fatalf("source items aren't sorted; items:\n%q", itemsB) } resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB) if len(resultItemsB) != len(expectedItems) { t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems)) } - if err := checkItemsSorted(resultItemsB); err != nil { - t.Fatalf("result items aren't sorted: %s", err) + if !checkItemsSorted(resultItemsB) { + t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB) } for i, item := range resultItemsB { if !bytes.HasPrefix(resultData, item) {