Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-06-11 13:00:09 +03:00
commit b348114dab
16 changed files with 141 additions and 143 deletions

View file

@ -1780,7 +1780,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-search.maxExportDuration duration
The maximum duration for /api/v1/export call (default 720h0m0s)
-search.maxLookback duration
Synonim to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons
Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons
-search.maxPointsPerTimeseries int
The maximum points per a single timeseries returned from /api/v1/query_range. This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points returned to graphing UI such as Grafana. There is no sense in setting this limit to values bigger than the horizontal resolution of the graph (default 30000)
-search.maxQueryDuration duration

View file

@ -34,23 +34,18 @@ type promRange struct {
func (r promInstant) metrics() ([]Metric, error) {
var result []Metric
var m Metric
for i, res := range r.Result {
f, err := strconv.ParseFloat(res.TV[1].(string), 64)
if err != nil {
return nil, fmt.Errorf("metric %v, unable to parse float64 from %s: %w", res, res.TV[1], err)
}
m.Labels = nil
var m Metric
for k, v := range r.Result[i].Labels {
m.AddLabel(k, v)
}
m.Timestamps = append(m.Timestamps, int64(res.TV[0].(float64)))
m.Values = append(m.Values, f)
result = append(result, m)
m.Values = m.Values[:0]
m.Labels = m.Labels[:0]
m.Timestamps = m.Timestamps[:0]
}
return result, nil
}

View file

@ -66,7 +66,7 @@ func TestVMInstantQuery(t *testing.T) {
case 5:
w.Write([]byte(`{"status":"success","data":{"resultType":"matrix"}}`))
case 6:
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]}]}}`))
w.Write([]byte(`{"status":"success","data":{"resultType":"vector","result":[{"metric":{"__name__":"vm_rows"},"value":[1583786142,"13763"]},{"metric":{"__name__":"vm_requests"},"value":[1583786140,"2000"]}]}}`))
}
})
@ -100,16 +100,23 @@ func TestVMInstantQuery(t *testing.T) {
if err != nil {
t.Fatalf("unexpected %s", err)
}
if len(m) != 1 {
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
if len(m) != 2 {
t.Fatalf("expected 2 metrics got %d in %+v", len(m), m)
}
expected := Metric{
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
Timestamps: []int64{1583786142},
Values: []float64{13763},
expected := []Metric{
{
Labels: []Label{{Value: "vm_rows", Name: "__name__"}},
Timestamps: []int64{1583786142},
Values: []float64{13763},
},
{
Labels: []Label{{Value: "vm_requests", Name: "__name__"}},
Timestamps: []int64{1583786140},
Values: []float64{2000},
},
}
if !reflect.DeepEqual(m[0], expected) {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
if !reflect.DeepEqual(m, expected) {
t.Fatalf("unexpected metric %+v want %+v", m, expected)
}
g := NewGraphiteType()
@ -122,12 +129,12 @@ func TestVMInstantQuery(t *testing.T) {
if len(m) != 1 {
t.Fatalf("expected 1 metric got %d in %+v", len(m), m)
}
expected = Metric{
exp := Metric{
Labels: []Label{{Value: "constantLine(10)", Name: "name"}},
Timestamps: []int64{1611758403},
Values: []float64{10},
}
if !reflect.DeepEqual(m[0], expected) {
if !reflect.DeepEqual(m[0], exp) {
t.Fatalf("unexpected metric %+v want %+v", m[0], expected)
}
}

View file

@ -81,10 +81,24 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
r.Header.Set("vm-target-url", targetURL.String())
reverseProxy.ServeHTTP(w, r)
proxyRequest(w, r)
return true
}
func proxyRequest(w http.ResponseWriter, r *http.Request) {
defer func() {
err := recover()
if err == nil || err == http.ErrAbortHandler {
// Suppress http.ErrAbortHandler panic.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
return
}
// Forward other panics to the caller.
panic(err)
}()
reverseProxy.ServeHTTP(w, r)
}
var configReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
var reverseProxy = &httputil.ReverseProxy{

View file

@ -34,7 +34,7 @@ var (
latencyOffset = flag.Duration("search.latencyOffset", time.Second*30, "The time when data points become visible in query results after the collection. "+
"Too small value can result in incomplete last points for query results")
maxQueryLen = flagutil.NewBytes("search.maxQueryLen", 16*1024, "The maximum search query length in bytes")
maxLookback = flag.Duration("search.maxLookback", 0, "Synonim to -search.lookback-delta from Prometheus. "+
maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -search.lookback-delta from Prometheus. "+
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
"See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons")
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+

View file

@ -87,7 +87,7 @@ var transformFuncs = map[string]transformFunc{
"label_match": transformLabelMatch,
"label_mismatch": transformLabelMismatch,
"union": transformUnion,
"": transformUnion, // empty func is a synonim to union
"": transformUnion, // empty func is a synonym to union
"keep_last_value": transformKeepLastValue,
"keep_next_value": transformKeepNextValue,
"interpolate": transformInterpolate,

View file

@ -6,6 +6,9 @@ sort: 15
## tip
* BUGFIX: vmalert: fix recording rules, which were broken in v1.61.0. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1369).
* BUGFIX: reset the on-disk cache for mapping from the full metric name to an internal metric id (e.g. `metric_name{labels} -> internal_metric_id`) after deleting metrics via [delete API](https://docs.victoriametrics.com/#how-to-delete-time-series). This should prevent from possible inconsistent state after unclean shutdown. This [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347).
## [v1.61.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.61.0)

View file

@ -1784,7 +1784,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-search.maxExportDuration duration
The maximum duration for /api/v1/export call (default 720h0m0s)
-search.maxLookback duration
Synonim to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons
Synonym to -search.lookback-delta from Prometheus. The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. See also '-search.maxStalenessInterval' flag, which has the same meaining due to historical reasons
-search.maxPointsPerTimeseries int
The maximum points per a single timeseries returned from /api/v1/query_range. This option doesn't limit the number of scanned raw samples in the database. The main purpose of this option is to limit the number of per-series points returned to graphing UI such as Grafana. There is no sense in setting this limit to values bigger than the horizontal resolution of the graph (default 30000)
-search.maxQueryDuration duration

View file

@ -216,9 +216,7 @@ func handlerWrapper(s *server, w http.ResponseWriter, r *http.Request, rh Reques
// The following recover() code works around this by explicitly stopping the process after logging the panic.
// See https://github.com/golang/go/issues/16542#issuecomment-246549902 for details.
defer func() {
// need to check for abortHandler
// https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1353
if err := recover(); err != nil && err != http.ErrAbortHandler {
if err := recover(); err != nil {
buf := make([]byte, 1<<20)
n := runtime.Stack(buf, false)
fmt.Fprintf(os.Stderr, "panic: %v\n\n%s", err, buf[:n])

View file

@ -91,14 +91,8 @@ type indexDB struct {
// Cache for fast TagFilters -> TSIDs lookup.
tagCache *workingsetcache.Cache
// Cache for fast MetricID -> TSID lookup.
metricIDCache *workingsetcache.Cache
// Cache for fast MetricID -> MetricName lookup.
metricNameCache *workingsetcache.Cache
// Cache for fast MetricName -> TSID lookups.
tsidCache *workingsetcache.Cache
// The parent storage.
s *Storage
// Cache for useless TagFilters entries, which have no tag filters
// matching low number of metrics.
@ -118,21 +112,12 @@ type indexDB struct {
// metricIDs, since it usually requires 1 bit per deleted metricID.
deletedMetricIDs atomic.Value
deletedMetricIDsUpdateLock sync.Mutex
// The minimum timestamp when queries with composite index can be used.
minTimestampForCompositeIndex int64
}
// openIndexDB opens index db from the given path with the given caches.
func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (*indexDB, error) {
if metricIDCache == nil {
logger.Panicf("BUG: metricIDCache must be non-nil")
}
if metricNameCache == nil {
logger.Panicf("BUG: metricNameCache must be non-nil")
}
if tsidCache == nil {
logger.Panicf("BUG: tsidCache must be nin-nil")
func openIndexDB(path string, s *Storage) (*indexDB, error) {
if s == nil {
logger.Panicf("BUG: Storage must be nin-nil")
}
tb, err := mergeset.OpenTable(path, invalidateTagCache, mergeTagToMetricIDsRows)
@ -151,13 +136,9 @@ func openIndexDB(path string, metricIDCache, metricNameCache, tsidCache *working
name: name,
tagCache: workingsetcache.New(mem/32, time.Hour),
metricIDCache: metricIDCache,
metricNameCache: metricNameCache,
tsidCache: tsidCache,
s: s,
uselessTagFiltersCache: workingsetcache.New(mem/128, time.Hour),
loopsPerDateTagFilterCache: workingsetcache.New(mem/128, time.Hour),
minTimestampForCompositeIndex: minTimestampForCompositeIndex,
}
is := db.getIndexSearch(noDeadline)
@ -249,7 +230,7 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)
m.MinTimestampForCompositeIndex = uint64(db.minTimestampForCompositeIndex)
m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex)
m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions)
m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions)
@ -323,9 +304,7 @@ func (db *indexDB) decRef() {
db.loopsPerDateTagFilterCache.Stop()
db.tagCache = nil
db.metricIDCache = nil
db.metricNameCache = nil
db.tsidCache = nil
db.s = nil
db.uselessTagFiltersCache = nil
db.loopsPerDateTagFilterCache = nil
@ -376,7 +355,7 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
// must be checked by the caller.
buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
tmp := db.metricIDCache.Get(buf[:0], key[:])
tmp := db.s.metricIDCache.Get(buf[:0], key[:])
if len(tmp) == 0 {
// The TSID for the given metricID wasn't found in the cache.
return io.EOF
@ -390,19 +369,19 @@ func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
db.metricIDCache.Set(key[:], buf[:])
db.s.metricIDCache.Set(key[:], buf[:])
}
func (db *indexDB) getMetricNameFromCache(dst []byte, metricID uint64) []byte {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
return db.metricNameCache.Get(dst, key[:])
return db.s.metricNameCache.Get(dst, key[:])
}
func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
db.metricNameCache.Set(key[:], metricName)
db.s.metricNameCache.Set(key[:], metricName)
}
func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
@ -1634,19 +1613,6 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
return nil
}
// Mark the found metricIDs as deleted.
items := getIndexItems()
for _, metricID := range metricIDs {
items.B = append(items.B, nsPrefixDeletedMetricID)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
err := db.tb.AddItems(items.Items)
putIndexItems(items)
if err != nil {
return err
}
// atomically add deleted metricIDs to an inmemory map.
dmis := &uint64set.Set{}
dmis.AddMulti(metricIDs)
@ -1656,11 +1622,25 @@ func (db *indexDB) deleteMetricIDs(metricIDs []uint64) error {
invalidateTagCache()
// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.
db.tsidCache.Reset()
db.s.resetAndSaveTSIDCache()
// Do not reset uselessTagFiltersCache, since the found metricIDs
// on cache miss are filtered out later with deletedMetricIDs.
return nil
// Store the metricIDs as deleted.
// Make this after updating the deletedMetricIDs and resetting caches
// in order to exclude the possibility of the inconsistent state when the deleted metricIDs
// remain available in the tsidCache after unclean shutdown.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1347
items := getIndexItems()
for _, metricID := range metricIDs {
items.B = append(items.B, nsPrefixDeletedMetricID)
items.B = encoding.MarshalUint64(items.B, metricID)
items.Next()
}
err := db.tb.AddItems(items.Items)
putIndexItems(items)
return err
}
func (db *indexDB) getDeletedMetricIDs() *uint64set.Set {
@ -1709,7 +1689,7 @@ func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int,
if len(tfss) == 0 {
return nil, nil
}
if tr.MinTimestamp >= db.minTimestampForCompositeIndex {
if tr.MinTimestamp >= db.s.minTimestampForCompositeIndex {
tfss = convertToCompositeTagFilterss(tfss)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mergeset"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/uint64set"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
@ -454,15 +455,11 @@ func TestMarshalUnmarshalTSIDs(t *testing.T) {
}
func TestIndexDBOpenClose(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
for i := 0; i < 5; i++ {
db, err := openIndexDB("test-index-db", metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB("test-index-db", s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -477,15 +474,11 @@ func TestIndexDB(t *testing.T) {
const metricGroups = 10
t.Run("serial", func(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-serial"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -515,7 +508,7 @@ func TestIndexDB(t *testing.T) {
// Re-open the db and verify it works as expected.
db.MustClose()
db, err = openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err = openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -531,15 +524,11 @@ func TestIndexDB(t *testing.T) {
})
t.Run("concurrent", func(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-concurrent"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1463,15 +1452,11 @@ func TestMatchTagFilters(t *testing.T) {
}
func TestSearchTSIDWithTimeRange(t *testing.T) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
dbName := "test-index-db-ts-range"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
t.Fatalf("cannot open indexDB: %s", err)
}
@ -1724,3 +1709,20 @@ func toTFPointers(tfs []tagFilter) []*tagFilter {
}
return tfps
}
func newTestStorage() *Storage {
return &Storage{
cachePath: "test-storage-cache",
metricIDCache: workingsetcache.New(1234, time.Hour),
metricNameCache: workingsetcache.New(1234, time.Hour),
tsidCache: workingsetcache.New(1234, time.Hour),
}
}
func stopTestStorage(s *Storage) {
s.metricIDCache.Stop()
s.metricNameCache.Stop()
s.tsidCache.Stop()
fs.MustRemoveAll(s.cachePath)
}

View file

@ -7,8 +7,6 @@ import (
"strconv"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/workingsetcache"
)
func BenchmarkRegexpFilterMatch(b *testing.B) {
@ -42,15 +40,11 @@ func BenchmarkRegexpFilterMismatch(b *testing.B) {
func BenchmarkIndexDBAddTSIDs(b *testing.B) {
const recordsPerLoop = 1e3
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-add-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -107,15 +101,11 @@ func benchmarkIndexDBAddTSIDs(db *indexDB, tsid *TSID, mn *MetricName, startOffs
func BenchmarkHeadPostingForMatchers(b *testing.B) {
// This benchmark is equivalent to https://github.com/prometheus/prometheus/blob/23c0299d85bfeb5d9b59e994861553a25ca578e5/tsdb/head_bench_test.go#L52
// See https://www.robustperception.io/evaluating-performance-and-correctness for more details.
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-head-posting-for-matchers"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}
@ -286,15 +276,11 @@ func BenchmarkHeadPostingForMatchers(b *testing.B) {
}
func BenchmarkIndexDBGetTSIDs(b *testing.B) {
metricIDCache := workingsetcache.New(1234, time.Hour)
metricNameCache := workingsetcache.New(1234, time.Hour)
tsidCache := workingsetcache.New(1234, time.Hour)
defer metricIDCache.Stop()
defer metricNameCache.Stop()
defer tsidCache.Stop()
s := newTestStorage()
defer stopTestStorage(s)
const dbName = "bench-index-db-get-tsids"
db, err := openIndexDB(dbName, metricIDCache, metricNameCache, tsidCache, 0)
db, err := openIndexDB(dbName, s)
if err != nil {
b.Fatalf("cannot open indexDB: %s", err)
}

View file

@ -79,10 +79,10 @@ const finalPartsToMerge = 3
// Higher number of shards reduces CPU contention and increases the max bandwidth on multi-core systems.
var rawRowsShardsPerPartition = (cgroup.AvailableCPUs() + 7) / 8
// getMaxRowsPerPartition returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerPartition() int {
// getMaxRawRowsPerShard returns the maximum number of rows that haven't been converted into parts yet.
func getMaxRawRowsPerShard() int {
maxRawRowsPerPartitionOnce.Do(func() {
n := memory.Allowed() / 256 / int(unsafe.Sizeof(rawRow{}))
n := memory.Allowed() / rawRowsShardsPerPartition / 256 / int(unsafe.Sizeof(rawRow{}))
if n < 1e4 {
n = 1e4
}
@ -515,7 +515,7 @@ type rawRows struct {
}
func getRawRowsMaxSize() *rawRows {
size := getMaxRawRowsPerPartition()
size := getMaxRawRowsPerShard()
return getRawRowsWithSize(size)
}

View file

@ -201,7 +201,7 @@ func OpenStorage(path string, retentionMsecs int64, maxHourlySeries, maxDailySer
if err := fs.MkdirAllIfNotExist(idbSnapshotsPath); err != nil {
return nil, fmt.Errorf("cannot create %q: %w", idbSnapshotsPath, err)
}
idbCurr, idbPrev, err := openIndexDBTables(idbPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
idbCurr, idbPrev, err := s.openIndexDBTables(idbPath)
if err != nil {
return nil, fmt.Errorf("cannot open indexdb tables at %q: %w", idbPath, err)
}
@ -579,7 +579,7 @@ func (s *Storage) mustRotateIndexDB() {
// Create new indexdb table.
newTableName := nextIndexDBTableName()
idbNewPath := s.path + "/indexdb/" + newTableName
idbNew, err := openIndexDB(idbNewPath, s.metricIDCache, s.metricNameCache, s.tsidCache, s.minTimestampForCompositeIndex)
idbNew, err := openIndexDB(idbNewPath, s)
if err != nil {
logger.Panicf("FATAL: cannot create new indexDB at %q: %s", idbNewPath, err)
}
@ -599,7 +599,7 @@ func (s *Storage) mustRotateIndexDB() {
fs.MustSyncPath(s.path)
// Flush tsidCache, so idbNew can be populated with fresh data.
s.tsidCache.Reset()
s.resetAndSaveTSIDCache()
// Flush dateMetricIDCache, so idbNew can be populated with fresh data.
s.dateMetricIDCache.Reset()
@ -610,6 +610,11 @@ func (s *Storage) mustRotateIndexDB() {
// There is no need in resetting nextDayMetricIDs, since it should be automatically reset every day.
}
func (s *Storage) resetAndSaveTSIDCache() {
s.tsidCache.Reset()
s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
}
// MustClose closes the storage.
//
// It is expected that the s is no longer used during the close.
@ -624,9 +629,12 @@ func (s *Storage) MustClose() {
s.idb().MustClose()
// Save caches.
s.mustSaveAndStopCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.mustSaveAndStopCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.mustSaveAndStopCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.mustSaveCache(s.tsidCache, "MetricName->TSID", "metricName_tsid")
s.tsidCache.Stop()
s.mustSaveCache(s.metricIDCache, "MetricID->TSID", "metricID_tsid")
s.metricIDCache.Stop()
s.mustSaveCache(s.metricNameCache, "MetricID->MetricName", "metricID_metricName")
s.metricNameCache.Stop()
hmCurr := s.currHourMetricIDs.Load().(*hourMetricIDs)
s.mustSaveHourMetricIDs(hmCurr, "curr_hour_metric_ids")
@ -853,7 +861,10 @@ func (s *Storage) mustLoadCache(info, name string, sizeBytes int) *workingsetcac
return c
}
func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name string) {
func (s *Storage) mustSaveCache(c *workingsetcache.Cache, info, name string) {
saveCacheLock.Lock()
defer saveCacheLock.Unlock()
path := s.cachePath + "/" + name
logger.Infof("saving %s cache to %q...", info, path)
startTime := time.Now()
@ -862,11 +873,13 @@ func (s *Storage) mustSaveAndStopCache(c *workingsetcache.Cache, info, name stri
}
var cs fastcache.Stats
c.UpdateStats(&cs)
c.Stop()
logger.Infof("saved %s cache to %q in %.3f seconds; entriesCount: %d; sizeBytes: %d",
info, path, time.Since(startTime).Seconds(), cs.EntriesCount, cs.BytesSize)
}
// saveCacheLock prevents from data races when multiple concurrent goroutines save the same cache.
var saveCacheLock sync.Mutex
func nextRetentionDuration(retentionMsecs int64) time.Duration {
// Round retentionMsecs to days. This guarantees that per-day inverted index works as expected.
retentionMsecs = ((retentionMsecs + msecPerDay - 1) / msecPerDay) * msecPerDay
@ -2158,7 +2171,7 @@ func (s *Storage) putTSIDToCache(tsid *TSID, metricName []byte) {
s.tsidCache.Set(metricName, buf)
}
func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *workingsetcache.Cache, minTimestampForCompositeIndex int64) (curr, prev *indexDB, err error) {
func (s *Storage) openIndexDBTables(path string) (curr, prev *indexDB, err error) {
if err := fs.MkdirAllIfNotExist(path); err != nil {
return nil, nil, fmt.Errorf("cannot create directory %q: %w", path, err)
}
@ -2217,12 +2230,12 @@ func openIndexDBTables(path string, metricIDCache, metricNameCache, tsidCache *w
// Open the last two tables.
currPath := path + "/" + tableNames[len(tableNames)-1]
curr, err = openIndexDB(currPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
curr, err = openIndexDB(currPath, s)
if err != nil {
return nil, nil, fmt.Errorf("cannot open curr indexdb table at %q: %w", currPath, err)
}
prevPath := path + "/" + tableNames[len(tableNames)-2]
prev, err = openIndexDB(prevPath, metricIDCache, metricNameCache, tsidCache, minTimestampForCompositeIndex)
prev, err = openIndexDB(prevPath, s)
if err != nil {
curr.MustClose()
return nil, nil, fmt.Errorf("cannot open prev indexdb table at %q: %w", prevPath, err)

View file

@ -110,7 +110,7 @@ func createBenchTable(b *testing.B, path string, startTimestamp int64, rowsPerIn
func benchmarkTableSearch(b *testing.B, rowsCount, tsidsCount, tsidsSearch int, fetchData bool) {
startTimestamp := timestampFromTime(time.Now()) - 365*24*3600*1000
rowsPerInsert := getMaxRawRowsPerPartition()
rowsPerInsert := getMaxRawRowsPerShard()
tb := openBenchTable(b, startTimestamp, rowsPerInsert, rowsCount, tsidsCount)
tr := TimeRange{

View file

@ -52,7 +52,7 @@ var transformFuncs = map[string]bool{
"label_match": true,
"label_mismatch": true,
"union": true,
"": true, // empty func is a synonim to union
"": true, // empty func is a synonym to union
"keep_last_value": true,
"keep_next_value": true,
"interpolate": true,