diff --git a/README.md b/README.md index d628f0226..8f8b1642d 100644 --- a/README.md +++ b/README.md @@ -1780,7 +1780,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -search.maxExportDuration duration The maximum duration for /api/v1/export call (default 720h0m0s) -search.maxLookback duration - Synonim to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons + Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons -search.maxPointsPerTimeseries int The maximum points per a single timeseries returned from /api/v1/query_range. This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points returned to graphing UI such as Grafana. There is no sense in setting this limit to values bigger than the horizontal resolution of the graph (default 30000) -search.maxQueryDuration duration diff --git a/app/vmalert/datasource/vm_prom_api.go b/app/vmalert/datasource/vm_prom_api.go index 6373b4ce4..e1e1b4937 100644 --- a/app/vmalert/datasource/vm_prom_api.go +++ b/app/vmalert/datasource/vm_prom_api.go @@ -34,23 +34,18 @@ type promRange struct { func (r promInstant) metrics() ([]Metric, error) { var result []Metric - var m Metric for i, res := range r.Result { f, err := strconv.ParseFloat(res.TV[1].(string), 64) if err != nil { return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err) } - m.Labels = nil + var m Metric for k, v := range r.Result[i].Labels { m.AddLabel(k, v) } m.Timestamps = append(m.Timestamps, int64(res.TV[0].(float64))) m.Values = append(m.Values, f) result = append(result, m) - - m.Values = m.Values[:0] - m.Labels = m.Labels[:0] - m.Timestamps = m.Timestamps[:0] } return result, nil } diff --git a/app/vmalert/datasource/vm_test.go b/app/vmalert/datasource/vm_test.go index 622a1e34a..c86deb3cd 100644 --- a/app/vmalert/datasource/vm_test.go +++ b/app/vmalert/datasource/vm_test.go @@ -66,7 +66,7 @@ func TestVMInstantQuery(t *testing.T) { case 5: w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`)) case 6: - w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`)) + w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests"},"value":[1583786140,"2000"]}]}}`)) } }) @@ -100,16 +100,23 @@ func TestVMInstantQuery(t *testing.T) { if err != nil { t.Fatalf("unexpected %s", err) } - if len(m) != 1 { - t.Fatalf("expected 1 metric got %d in %+v", len(m), m) + if len(m) != 2 { + t.Fatalf("expected 2 metrics got %d in %+v", len(m), m) } - expected := Metric{ - Labels: []Label{{Value: "vm_rows", Name: "__name__"}}, - Timestamps: []int64{1583786142}, - Values: []float64{13763}, + expected := []Metric{ + { + Labels: []Label{{Value: "vm_rows", Name: "__name__"}}, + Timestamps: []int64{1583786142}, + Values: []float64{13763}, + }, + { + Labels: []Label{{Value: "vm_requests", Name: "__name__"}}, + Timestamps: []int64{1583786140}, + Values: []float64{2000}, + }, } - if !reflect.DeepEqual(m[0], expected) { - t.Fatalf("unexpected metric %+v want %+v", m[0], expected) + if !reflect.DeepEqual(m, expected) { + t.Fatalf("unexpected metric %+v want %+v", m, expected) } g := NewGraphiteType() @@ -122,12 +129,12 @@ func TestVMInstantQuery(t *testing.T) { if len(m) != 1 { t.Fatalf("expected 1 metric got %d in %+v", len(m), m) } - expected = Metric{ + exp := Metric{ Labels: []Label{{Value: "constantLine(10)", Name: "name"}}, Timestamps: []int64{1611758403}, Values: []float64{10}, } - if !reflect.DeepEqual(m[0], expected) { + if !reflect.DeepEqual(m[0], exp) { t.Fatalf("unexpected metric %+v want %+v", m[0], expected) } } diff --git a/app/vmauth/main.go b/app/vmauth/main.go index 2825a93dc..b12672fcc 100644 --- a/app/vmauth/main.go +++ b/app/vmauth/main.go @@ -81,10 +81,24 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { return true } r.Header.Set("vm-target-url", targetURL.String()) - reverseProxy.ServeHTTP(w, r) + proxyRequest(w, r) return true } +func proxyRequest(w http.ResponseWriter, r *http.Request) { + defer func() { + err := recover() + if err == nil || err == http.ErrAbortHandler { + // Suppress http.ErrAbortHandler panic. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353 + return + } + // Forward other panics to the caller. + panic(err) + }() + reverseProxy.ServeHTTP(w, r) +} + var configReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`) var reverseProxy = &httputil.ReverseProxy{ diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 0328a9648..ea5c6c557 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -34,7 +34,7 @@ var ( latencyOffset = flag.Duration("search.latencyOffset", time.Second*30, "The time when data points become visible in query results after the collection. "+ "Too small value can result in incomplete last points for query results") maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes") - maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+ + maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -search.lookback-delta from Prometheus. "+ "The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+ "See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons") maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+ diff --git a/app/vmselect/promql/transform.go b/app/vmselect/promql/transform.go index 18998475d..68a75fead 100644 --- a/app/vmselect/promql/transform.go +++ b/app/vmselect/promql/transform.go @@ -87,7 +87,7 @@ var transformFuncs = map[string]transformFunc{ "label_match": transformLabelMatch, "label_mismatch": transformLabelMismatch, "union": transformUnion, - "": transformUnion, // empty func is a synonim to union + "": transformUnion, // empty func is a synonym to union "keep_last_value": transformKeepLastValue, "keep_next_value": transformKeepNextValue, "interpolate": transformInterpolate, diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3ff024bcf..e4850f3b6 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,9 @@ sort: 15 ## tip +* BUGFIX: vmalert: fix recording rules, which were broken in v1.61.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1369). +* BUGFIX: reset the on-disk cache for mapping from the full metric name to an internal metric id (e.g. `metric_name{labels} -> internal_metric_id`) after deleting metrics via [delete API](https://docs.victoriametrics.com/#how-to-delete-time-series). This should prevent from possible inconsistent state after unclean shutdown. This [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347). + ## [v1.61.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.0) diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index 65999e0d3..ad9d01f43 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -1784,7 +1784,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -search.maxExportDuration duration The maximum duration for /api/v1/export call (default 720h0m0s) -search.maxLookback duration - Synonim to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons + Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons -search.maxPointsPerTimeseries int The maximum points per a single timeseries returned from /api/v1/query_range. This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points returned to graphing UI such as Grafana. There is no sense in setting this limit to values bigger than the horizontal resolution of the graph (default 30000) -search.maxQueryDuration duration diff --git a/lib/httpserver/httpserver.go b/lib/httpserver/httpserver.go index 9c61bf767..a5dff2e55 100644 --- a/lib/httpserver/httpserver.go +++ b/lib/httpserver/httpserver.go @@ -216,9 +216,7 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques // The following recover() code works around this by explicitly stopping the process after logging the panic. // See https://github.com/golang/go/issues/16542#issuecomment-246549902 for details. defer func() { - // need to check for abortHandler - // https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353 - if err := recover(); err != nil && err != http.ErrAbortHandler { + if err := recover(); err != nil { buf := make([]byte, 1<<20) n := runtime.Stack(buf, false) fmt.Fprintf(os.Stderr, "panic: %v\n\n%s", err, buf[:n]) diff --git a/lib/storage/index_db.go b/lib/storage/index_db.go index 6f210ee16..f2244438a 100644 --- a/lib/storage/index_db.go +++ b/lib/storage/index_db.go @@ -91,14 +91,8 @@ type indexDB struct { // Cache for fast TagFilters -> TSIDs lookup. tagCache *workingsetcache.Cache - // Cache for fast MetricID -> TSID lookup. - metricIDCache *workingsetcache.Cache - - // Cache for fast MetricID -> MetricName lookup. - metricNameCache *workingsetcache.Cache - - // Cache for fast MetricName -> TSID lookups. - tsidCache *workingsetcache.Cache + // The parent storage. + s *Storage // Cache for useless TagFilters entries, which have no tag filters // matching low number of metrics. @@ -118,21 +112,12 @@ type indexDB struct { // metricIDs, since it usually requires 1 bit per deleted metricID. deletedMetricIDs atomic.Value deletedMetricIDsUpdateLock sync.Mutex - - // The minimum timestamp when queries with composite index can be used. - minTimestampForCompositeIndex int64 } // openIndexDB opens index db from the given path with the given caches. -func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (*indexDB, error) { - if metricIDCache == nil { - logger.Panicf("BUG: metricIDCache must be non-nil") - } - if metricNameCache == nil { - logger.Panicf("BUG: metricNameCache must be non-nil") - } - if tsidCache == nil { - logger.Panicf("BUG: tsidCache must be nin-nil") +func openIndexDB(path string, s *Storage) (*indexDB, error) { + if s == nil { + logger.Panicf("BUG: Storage must be nin-nil") } tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows) @@ -151,13 +136,9 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working name: name, tagCache: workingsetcache.New(mem/32, time.Hour), - metricIDCache: metricIDCache, - metricNameCache: metricNameCache, - tsidCache: tsidCache, + s: s, uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour), loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour), - - minTimestampForCompositeIndex: minTimestampForCompositeIndex, } is := db.getIndexSearch(noDeadline) @@ -249,7 +230,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) { m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed) m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder) - m.MinTimestampForCompositeIndex = uint64(db.minTimestampForCompositeIndex) + m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex) m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions) m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions) @@ -323,9 +304,7 @@ func (db *indexDB) decRef() { db.loopsPerDateTagFilterCache.Stop() db.tagCache = nil - db.metricIDCache = nil - db.metricNameCache = nil - db.tsidCache = nil + db.s = nil db.uselessTagFiltersCache = nil db.loopsPerDateTagFilterCache = nil @@ -376,7 +355,7 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error { // must be checked by the caller. buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst)) key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID)) - tmp := db.metricIDCache.Get(buf[:0], key[:]) + tmp := db.s.metricIDCache.Get(buf[:0], key[:]) if len(tmp) == 0 { // The TSID for the given metricID wasn't found in the cache. return io.EOF @@ -390,19 +369,19 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error { func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) { buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid)) key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID)) - db.metricIDCache.Set(key[:], buf[:]) + db.s.metricIDCache.Set(key[:], buf[:]) } func (db *indexDB) getMetricNameFromCache(dst []byte, metricID uint64) []byte { // There is no need in checking for deleted metricIDs here, since they // must be checked by the caller. key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID)) - return db.metricNameCache.Get(dst, key[:]) + return db.s.metricNameCache.Get(dst, key[:]) } func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) { key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID)) - db.metricNameCache.Set(key[:], metricName) + db.s.metricNameCache.Set(key[:], metricName) } func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte { @@ -1634,19 +1613,6 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { return nil } - // Mark the found metricIDs as deleted. - items := getIndexItems() - for _, metricID := range metricIDs { - items.B = append(items.B, nsPrefixDeletedMetricID) - items.B = encoding.MarshalUint64(items.B, metricID) - items.Next() - } - err := db.tb.AddItems(items.Items) - putIndexItems(items) - if err != nil { - return err - } - // atomically add deleted metricIDs to an inmemory map. dmis := &uint64set.Set{} dmis.AddMulti(metricIDs) @@ -1656,11 +1622,25 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error { invalidateTagCache() // Reset MetricName -> TSID cache, since it may contain deleted TSIDs. - db.tsidCache.Reset() + db.s.resetAndSaveTSIDCache() // Do not reset uselessTagFiltersCache, since the found metricIDs // on cache miss are filtered out later with deletedMetricIDs. - return nil + + // Store the metricIDs as deleted. + // Make this after updating the deletedMetricIDs and resetting caches + // in order to exclude the possibility of the inconsistent state when the deleted metricIDs + // remain available in the tsidCache after unclean shutdown. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347 + items := getIndexItems() + for _, metricID := range metricIDs { + items.B = append(items.B, nsPrefixDeletedMetricID) + items.B = encoding.MarshalUint64(items.B, metricID) + items.Next() + } + err := db.tb.AddItems(items.Items) + putIndexItems(items) + return err } func (db *indexDB) getDeletedMetricIDs() *uint64set.Set { @@ -1709,7 +1689,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int, if len(tfss) == 0 { return nil, nil } - if tr.MinTimestamp >= db.minTimestampForCompositeIndex { + if tr.MinTimestamp >= db.s.minTimestampForCompositeIndex { tfss = convertToCompositeTagFilterss(tfss) } diff --git a/lib/storage/index_db_test.go b/lib/storage/index_db_test.go index 8678afcd0..681dd8525 100644 --- a/lib/storage/index_db_test.go +++ b/lib/storage/index_db_test.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset" "github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set" "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" @@ -454,15 +455,11 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) { } func TestIndexDBOpenClose(t *testing.T) { - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) for i := 0; i < 5; i++ { - db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB("test-index-db", s) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -477,15 +474,11 @@ func TestIndexDB(t *testing.T) { const metricGroups = 10 t.Run("serial", func(t *testing.T) { - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) dbName := "test-index-db-serial" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -515,7 +508,7 @@ func TestIndexDB(t *testing.T) { // Re-open the db and verify it works as expected. db.MustClose() - db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err = openIndexDB(dbName, s) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -531,15 +524,11 @@ func TestIndexDB(t *testing.T) { }) t.Run("concurrent", func(t *testing.T) { - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) dbName := "test-index-db-concurrent" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1463,15 +1452,11 @@ func TestMatchTagFilters(t *testing.T) { } func TestSearchTSIDWithTimeRange(t *testing.T) { - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) dbName := "test-index-db-ts-range" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { t.Fatalf("cannot open indexDB: %s", err) } @@ -1724,3 +1709,20 @@ func toTFPointers(tfs []tagFilter) []*tagFilter { } return tfps } + +func newTestStorage() *Storage { + return &Storage{ + cachePath: "test-storage-cache", + + metricIDCache: workingsetcache.New(1234, time.Hour), + metricNameCache: workingsetcache.New(1234, time.Hour), + tsidCache: workingsetcache.New(1234, time.Hour), + } +} + +func stopTestStorage(s *Storage) { + s.metricIDCache.Stop() + s.metricNameCache.Stop() + s.tsidCache.Stop() + fs.MustRemoveAll(s.cachePath) +} diff --git a/lib/storage/index_db_timing_test.go b/lib/storage/index_db_timing_test.go index 08aaf2463..40ae12409 100644 --- a/lib/storage/index_db_timing_test.go +++ b/lib/storage/index_db_timing_test.go @@ -7,8 +7,6 @@ import ( "strconv" "testing" "time" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache" ) func BenchmarkRegexpFilterMatch(b *testing.B) { @@ -42,15 +40,11 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) { func BenchmarkIndexDBAddTSIDs(b *testing.B) { const recordsPerLoop = 1e3 - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) const dbName = "bench-index-db-add-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -107,15 +101,11 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs func BenchmarkHeadPostingForMatchers(b *testing.B) { // This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52 // See https://www.robustperception.io/evaluating-performance-and-correctness for more details. - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) const dbName = "bench-head-posting-for-matchers" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } @@ -286,15 +276,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) { } func BenchmarkIndexDBGetTSIDs(b *testing.B) { - metricIDCache := workingsetcache.New(1234, time.Hour) - metricNameCache := workingsetcache.New(1234, time.Hour) - tsidCache := workingsetcache.New(1234, time.Hour) - defer metricIDCache.Stop() - defer metricNameCache.Stop() - defer tsidCache.Stop() + s := newTestStorage() + defer stopTestStorage(s) const dbName = "bench-index-db-get-tsids" - db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0) + db, err := openIndexDB(dbName, s) if err != nil { b.Fatalf("cannot open indexDB: %s", err) } diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 22ace7175..3288112f2 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -79,10 +79,10 @@ const finalPartsToMerge = 3 // Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems. var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8 -// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet. -func getMaxRawRowsPerPartition() int { +// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet. +func getMaxRawRowsPerShard() int { maxRawRowsPerPartitionOnce.Do(func() { - n := memory.Allowed() / 256 / int(unsafe.Sizeof(rawRow{})) + n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{})) if n < 1e4 { n = 1e4 } @@ -515,7 +515,7 @@ type rawRows struct { } func getRawRowsMaxSize() *rawRows { - size := getMaxRawRowsPerPartition() + size := getMaxRawRowsPerShard() return getRawRowsWithSize(size) } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 47d29dcbd..239dce203 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -201,7 +201,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil { return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err) } - idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex) + idbCurr, idbPrev, err := s.openIndexDBTables(idbPath) if err != nil { return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err) } @@ -579,7 +579,7 @@ func (s *Storage) mustRotateIndexDB() { // Create new indexdb table. newTableName := nextIndexDBTableName() idbNewPath := s.path + "/indexdb/" + newTableName - idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex) + idbNew, err := openIndexDB(idbNewPath, s) if err != nil { logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err) } @@ -599,7 +599,7 @@ func (s *Storage) mustRotateIndexDB() { fs.MustSyncPath(s.path) // Flush tsidCache, so idbNew can be populated with fresh data. - s.tsidCache.Reset() + s.resetAndSaveTSIDCache() // Flush dateMetricIDCache, so idbNew can be populated with fresh data. s.dateMetricIDCache.Reset() @@ -610,6 +610,11 @@ func (s *Storage) mustRotateIndexDB() { // There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day. } +func (s *Storage) resetAndSaveTSIDCache() { + s.tsidCache.Reset() + s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") +} + // MustClose closes the storage. // // It is expected that the s is no longer used during the close. @@ -624,9 +629,12 @@ func (s *Storage) MustClose() { s.idb().MustClose() // Save caches. - s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") - s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") - s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") + s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid") + s.tsidCache.Stop() + s.mustSaveCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid") + s.metricIDCache.Stop() + s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName") + s.metricNameCache.Stop() hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs) s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids") @@ -853,7 +861,10 @@ func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcac return c } -func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name string) { +func (s *Storage) mustSaveCache(c *workingsetcache.Cache, info, name string) { + saveCacheLock.Lock() + defer saveCacheLock.Unlock() + path := s.cachePath + "/" + name logger.Infof("saving %s cache to %q...", info, path) startTime := time.Now() @@ -862,11 +873,13 @@ func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name stri } var cs fastcache.Stats c.UpdateStats(&cs) - c.Stop() logger.Infof("saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d", info, path, time.Since(startTime).Seconds(), cs.EntriesCount, cs.BytesSize) } +// saveCacheLock prevents from data races when multiple concurrent goroutines save the same cache. +var saveCacheLock sync.Mutex + func nextRetentionDuration(retentionMsecs int64) time.Duration { // Round retentionMsecs to days. This guarantees that per-day inverted index works as expected. retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay @@ -2158,7 +2171,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) { s.tsidCache.Set(metricName, buf) } -func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (curr, prev *indexDB, err error) { +func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) { if err := fs.MkdirAllIfNotExist(path); err != nil { return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err) } @@ -2217,12 +2230,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w // Open the last two tables. currPath := path + "/" + tableNames[len(tableNames)-1] - curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex) + curr, err = openIndexDB(currPath, s) if err != nil { return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err) } prevPath := path + "/" + tableNames[len(tableNames)-2] - prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex) + prev, err = openIndexDB(prevPath, s) if err != nil { curr.MustClose() return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err) diff --git a/lib/storage/table_search_timing_test.go b/lib/storage/table_search_timing_test.go index 927b54c40..cbe939d4b 100644 --- a/lib/storage/table_search_timing_test.go +++ b/lib/storage/table_search_timing_test.go @@ -110,7 +110,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) { startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000 - rowsPerInsert := getMaxRawRowsPerPartition() + rowsPerInsert := getMaxRawRowsPerShard() tb := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount) tr := TimeRange{ diff --git a/vendor/github.com/VictoriaMetrics/metricsql/transform.go b/vendor/github.com/VictoriaMetrics/metricsql/transform.go index 3dbac052a..bfeb53401 100644 --- a/vendor/github.com/VictoriaMetrics/metricsql/transform.go +++ b/vendor/github.com/VictoriaMetrics/metricsql/transform.go @@ -52,7 +52,7 @@ var transformFuncs = map[string]bool{ "label_match": true, "label_mismatch": true, "union": true, - "": true, // empty func is a synonim to union + "": true, // empty func is a synonym to union "keep_last_value": true, "keep_next_value": true, "interpolate": true,