diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 53af4c0e99..0bc3b19f86 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -265,6 +265,12 @@ func registerStorageMetrics(strg *storage.Storage) { metrics.NewGauge(`vm_date_metric_ids_search_hits_total`, func() float64 { return float64(idbm().DateMetricIDsSearchHits) }) + metrics.NewGauge(`vm_index_blocks_with_metric_ids_processed_total`, func() float64 { + return float64(idbm().IndexBlocksWithMetricIDsProcessed) + }) + metrics.NewGauge(`vm_index_blocks_with_metric_ids_incorrect_order_total`, func() float64 { + return float64(idbm().IndexBlocksWithMetricIDsIncorrectOrder) + }) metrics.NewGauge(`vm_assisted_merges_total{type="storage/small"}`, func() float64 { return float64(tm().SmallAssistedMerges) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index d5f558845a..8571e36400 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -201,6 +201,9 @@ type IndexDBMetrics struct { DateMetricIDsSearchCalls uint64 DateMetricIDsSearchHits uint64 + IndexBlocksWithMetricIDsProcessed uint64 + IndexBlocksWithMetricIDsIncorrectOrder uint64 + mergeset.TableMetrics } @@ -235,6 +238,9 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.DateMetricIDsSearchCalls += atomic.LoadUint64(&db.dateMetricIDsSearchCalls) m.DateMetricIDsSearchHits += atomic.LoadUint64(&db.dateMetricIDsSearchHits) + m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) + m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) + db.tb.UpdateMetrics(&m.TableMetrics) db.doExtDB(func(extDB *indexDB) { extDB.tb.UpdateMetrics(&m.TableMetrics) @@ -2391,8 +2397,23 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { if len(tmm.pendingMetricIDs) > 0 { logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs) } - if err := checkItemsSorted(dstItems); err != nil { - logger.Errorf("please report this error at https://github.com/VictoriaMetrics/VictoriaMetrics/issues : %s", err) + if !checkItemsSorted(dstItems) { + // Items could become unsorted if initial items contain duplicate metricIDs: + // + // item1: 1, 1, 5 + // item2: 1, 4 + // + // Items could become the following after the merge: + // + // item1: 1, 5 + // item2: 1, 4 + // + // i.e. item1 > item2 + // + // Leave the original items unmerged, so they can be merged next time. + // This case should be quite rare - if multiple data points are simultaneously inserted + // into the same new time series from multiple concurrent goroutines. + atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1) dstData = append(dstData[:0], tmm.dataCopy...) dstItems = dstItems[:0] // tmm.itemsCopy can point to overwritten data, so it must be updated @@ -2402,26 +2423,30 @@ func mergeTagToMetricIDsRows(data []byte, items [][]byte) ([]byte, [][]byte) { dstItems = append(dstItems, buf[:len(item)]) buf = buf[len(item):] } - if err := checkItemsSorted(dstItems); err != nil { - logger.Panicf("BUG: the original items weren't sorted: %s", err) + if !checkItemsSorted(dstItems) { + logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems) } } putTagToMetricIDsRowsMerger(tmm) + atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1) return dstData, dstItems } -func checkItemsSorted(items [][]byte) error { +var indexBlocksWithMetricIDsIncorrectOrder uint64 +var indexBlocksWithMetricIDsProcessed uint64 + +func checkItemsSorted(items [][]byte) bool { if len(items) == 0 { - return nil + return true } prevItem := items[0] for _, currItem := range items[1:] { if string(prevItem) > string(currItem) { - return fmt.Errorf("items aren't sorted: prevItem > currItem; prevItem=%X; currItem=%X; items=%X", prevItem, currItem, items) + return false } prevItem = currItem } - return nil + return true } // maxMetricIDsPerRow limits the number of metricIDs in tag->metricIDs row. diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 0bd00180bf..ea603cdbe1 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -25,15 +25,15 @@ func TestMergeTagToMetricIDsRows(t *testing.T) { data = append(data, item...) itemsB = append(itemsB, data[len(data)-len(item):]) } - if err := checkItemsSorted(itemsB); err != nil { - t.Fatalf("source items aren't sorted: %s", err) + if !checkItemsSorted(itemsB) { + t.Fatalf("source items aren't sorted; items:\n%q", itemsB) } resultData, resultItemsB := mergeTagToMetricIDsRows(data, itemsB) if len(resultItemsB) != len(expectedItems) { t.Fatalf("unexpected len(resultItemsB); got %d; want %d", len(resultItemsB), len(expectedItems)) } - if err := checkItemsSorted(resultItemsB); err != nil { - t.Fatalf("result items aren't sorted: %s", err) + if !checkItemsSorted(resultItemsB) { + t.Fatalf("result items aren't sorted; items:\n%q", resultItemsB) } for i, item := range resultItemsB { if !bytes.HasPrefix(resultData, item) {