Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-10-23 14:02:45 +03:00
commit db553f12bc
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
11 changed files with 330 additions and 381 deletions

View file

@ -2309,7 +2309,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -152,6 +152,9 @@ func InitRollupResultCache(cachePath string) {
metrics.GetOrCreateGauge(`vm_cache_size_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().BytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_size_max_bytes{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().MaxBytesSize)
})
metrics.GetOrCreateGauge(`vm_cache_requests_total{type="promql/rollupResult"}`, func() float64 {
return float64(fcs().GetCalls)
})

View file

@ -58,10 +58,14 @@ var (
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
)
// CheckTimeRange returns true if the given tr is denied for querying.
@ -101,7 +105,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.N)
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N)
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N)
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N)
@ -605,19 +609,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().AddRowsConcurrencyCurrent)
})
metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitReached)
})
metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitTimeout)
})
metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCapacity)
})
metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCurrent)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
@ -721,8 +712,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSize)
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize())
@ -762,8 +753,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeBytes)
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSizeBytes())
@ -793,8 +784,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeMaxBytes)
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMaxSizeBytes())
@ -821,8 +812,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheRequests)
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheRequests())
@ -849,8 +840,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheMisses)
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMisses())

View file

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
**Update note 1:** the `indexdb/tagFilters` cache type at [/metrics](https://docs.victoriametrics.com/#monitoring) has been renamed to `indexdb/tagFiltersToMetricIDs` in order to make its puropose more clear.
* FEATURE: allow limiting memory usage on a per-query basis with `-search.maxMemoryPerQuery` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): drop all the labels with `__` prefix from discovered targets in the same way as Prometheus does according to [this article](https://www.robustperception.io/life-of-a-label/). Previously the following labels were available during [metric-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs): `__address__`, `__scheme__`, `__metrics_path__`, `__scrape_interval__`, `__scrape_timeout__`, `__param_*`. Now these labels are available only during [target-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). This should reduce CPU usage and memory usage for `vmagent` setups, which scrape big number of targets.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for metric-level [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling), which can be applied via `metric_relabel_configs` section at [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs), via `-remoteWrite.relabelConfig` or via `-remoteWrite.urlRelabelConfig` command-line options.
@ -41,10 +43,12 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155).
* FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files.
* FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal.
* FEATURE: improve the performance of [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) endpoint by eliminating loading of unused `TSID` data during the API call.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not show invalid error message in Kubernetes service discovery: `cannot parse WatchEvent json response: EOF`. The invalid error message has been appeared in [v1.82.0](https://docs.victoriametrics.com/CHANGELOG.html#v1820).
* BUGFIX: `vmselect`: expose missing metric `vm_cache_size_max_bytes{type="promql/rollupResult"}` . This metric is used for monitoring rollup cache usage with the query `vm_cache_size_bytes{type="promql/rollupResult"} / vm_cache_size_max_bytes{type="promql/rollupResult"}` in the same way as this is done for other cache types.
## [v1.82.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.82.1)

View file

@ -1157,7 +1157,7 @@ Below is the output for `/path/to/vmstorage -help`:
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2310,7 +2310,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2313,7 +2313,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"sync"
@ -97,8 +98,8 @@ type indexDB struct {
extDB *indexDB
extDBLock sync.Mutex
// Cache for fast TagFilters -> TSIDs lookup.
tagFiltersCache *workingsetcache.Cache
// Cache for fast TagFilters -> MetricIDs lookup.
tagFiltersToMetricIDsCache *workingsetcache.Cache
// The parent storage.
s *Storage
@ -110,18 +111,18 @@ type indexDB struct {
indexSearchPool sync.Pool
}
var maxTagFilterCacheSize int
var maxTagFiltersCacheSize int
// SetTagFilterCacheSize overrides the default size of indexdb/tagFilters cache
func SetTagFilterCacheSize(size int) {
maxTagFilterCacheSize = size
// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
func SetTagFiltersCacheSize(size int) {
maxTagFiltersCacheSize = size
}
func getTagFilterCacheSize() int {
if maxTagFilterCacheSize <= 0 {
func getTagFiltersCacheSize() int {
if maxTagFiltersCacheSize <= 0 {
return int(float64(memory.Allowed()) / 32)
}
return maxTagFilterCacheSize
return maxTagFiltersCacheSize
}
// openIndexDB opens index db from the given path.
@ -147,8 +148,9 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
}
// Do not persist tagFiltersCache in files, since it is very volatile.
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize()
db := &indexDB{
refCount: 1,
@ -157,7 +159,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
tb: tb,
name: name,
tagFiltersCache: workingsetcache.New(getTagFilterCacheSize()),
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
}
@ -168,11 +170,11 @@ const noDeadline = 1<<64 - 1
// IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct {
TagFiltersCacheSize uint64
TagFiltersCacheSizeBytes uint64
TagFiltersCacheSizeMaxBytes uint64
TagFiltersCacheRequests uint64
TagFiltersCacheMisses uint64
TagFiltersToMetricIDsCacheSize uint64
TagFiltersToMetricIDsCacheSizeBytes uint64
TagFiltersToMetricIDsCacheSizeMaxBytes uint64
TagFiltersToMetricIDsCacheRequests uint64
TagFiltersToMetricIDsCacheMisses uint64
DeletedMetricsCount uint64
@ -210,12 +212,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
var cs fastcache.Stats
cs.Reset()
db.tagFiltersCache.UpdateStats(&cs)
m.TagFiltersCacheSize += cs.EntriesCount
m.TagFiltersCacheSizeBytes += cs.BytesSize
m.TagFiltersCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersCacheRequests += cs.GetCalls
m.TagFiltersCacheMisses += cs.Misses
db.tagFiltersToMetricIDsCache.UpdateStats(&cs)
m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
m.TagFiltersToMetricIDsCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
@ -296,10 +298,10 @@ func (db *indexDB) decRef() {
db.SetExtDB(nil)
// Free space occupied by caches owned by db.
db.tagFiltersCache.Stop()
db.tagFiltersToMetricIDsCache.Stop()
db.loopsPerDateTagFilterCache.Stop()
db.tagFiltersCache = nil
db.tagFiltersToMetricIDsCache = nil
db.s = nil
db.loopsPerDateTagFilterCache = nil
@ -312,74 +314,36 @@ func (db *indexDB) decRef() {
logger.Infof("indexDB %q has been dropped", tbPath)
}
func (db *indexDB) getFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]TSID, bool) {
qt = qt.NewChild("search for tsids in tag filters cache")
var tagBufPool bytesutil.ByteBufferPool
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
qt = qt.NewChild("search for metricIDs in tag filters cache")
defer qt.Done()
compressedBuf := tagBufPool.Get()
defer tagBufPool.Put(compressedBuf)
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key)
if len(compressedBuf.B) == 0 {
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
if len(buf.B) == 0 {
qt.Printf("cache miss")
return nil, false
}
if compressedBuf.B[0] == 0 {
// Fast path - tsids are stored in uncompressed form.
qt.Printf("found tsids with size: %d bytes", len(compressedBuf.B))
tsids, err := unmarshalTSIDs(nil, compressedBuf.B[1:])
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
}
// Slow path - tsids are stored in compressed form.
qt.Printf("found tsids with compressed size: %d bytes", len(compressedBuf.B))
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
var err error
buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B[1:])
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
metricIDs, err := unmarshalMetricIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err)
logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
}
qt.Printf("decompressed tsids to %d bytes", len(buf.B))
tsids, err := unmarshalTSIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
return metricIDs, true
}
var tagBufPool bytesutil.ByteBufferPool
func (db *indexDB) putToTagFiltersCache(qt *querytracer.Tracer, tsids []TSID, key []byte) {
qt = qt.NewChild("put %d tsids in cache", len(tsids))
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
defer qt.Done()
if len(tsids) <= 2 {
// Fast path - store small number of tsids in uncompressed form.
// This saves CPU time on compress / decompress.
buf := tagBufPool.Get()
buf.B = append(buf.B[:0], 0)
buf.B = marshalTSIDs(buf.B, tsids)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
db.tagFiltersCache.SetBig(key, buf.B)
qt.Printf("store %d tsids into cache", len(tsids))
tagBufPool.Put(buf)
return
}
// Slower path - store big number of tsids in compressed form.
// This increases cache capacity.
buf := tagBufPool.Get()
buf.B = marshalTSIDs(buf.B[:0], tsids)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
compressedBuf := tagBufPool.Get()
compressedBuf.B = append(compressedBuf.B[:0], 1)
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B, buf.B, 1)
qt.Printf("compressed %d tsids into %d bytes", len(tsids), len(compressedBuf.B))
buf.B = marshalMetricIDs(buf.B, metricIDs)
qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
qt.Printf("stored %d metricIDs into cache", len(metricIDs))
tagBufPool.Put(buf)
db.tagFiltersCache.SetBig(key, compressedBuf.B)
qt.Printf("stored %d compressed tsids into cache", len(tsids))
tagBufPool.Put(compressedBuf)
}
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
@ -475,35 +439,44 @@ func invalidateTagFiltersCache() {
var tagFiltersKeyGen uint64
func marshalTSIDs(dst []byte, tsids []TSID) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(tsids)))
for i := range tsids {
dst = tsids[i].Marshal(dst)
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
if len(metricIDs) == 0 {
return dst
}
var buf []byte
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
sh.Cap = sh.Len
sh.Len = 8 * len(metricIDs)
dst = append(dst, buf...)
return dst
}
func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
if len(src)%8 != 0 {
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
}
if len(src) < 8 {
return dst, fmt.Errorf("cannot unmarshal the number of tsids from %d bytes; require at least %d bytes", len(src), 8)
return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
}
n := encoding.UnmarshalUint64(src)
if n > ((1<<64)-1)/8 {
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8))
}
src = src[8:]
dstLen := len(dst)
if nn := dstLen + int(n) - cap(dst); nn > 0 {
dst = append(dst[:cap(dst)], make([]TSID, nn)...)
if n*8 != uint64(len(src)) {
return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
}
dst = dst[:dstLen+int(n)]
for i := 0; i < int(n); i++ {
tail, err := dst[dstLen+i].Unmarshal(src)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal tsid #%d out of %d: %w", i, n, err)
}
src = tail
}
if len(src) > 0 {
return dst, fmt.Errorf("non-zero tail left after unmarshaling %d tsids; len(tail)=%d", n, len(src))
if n == 0 {
return dst, nil
}
var metricIDs []uint64
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
sh.Data = uintptr(unsafe.Pointer(&src[0]))
sh.Cap = sh.Len
sh.Len = len(src) / 8
dst = append(dst, metricIDs...)
return dst, nil
}
@ -1783,8 +1756,10 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
return dmis, nil
}
// searchTSIDs returns sorted tsids matching the given tfss over the given tr.
func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
if len(tfss) == 0 {
return nil, nil
}
@ -1792,31 +1767,30 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
tfss = convertToCompositeTagFilterss(tfss)
}
qtChild := qt.NewChild("search for tsids in the current indexdb")
qtChild := qt.NewChild("search for metricIDs in the current indexdb")
tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
tsids, ok := db.getFromTagFiltersCache(qtChild, tfKeyBuf.B)
metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qtChild, tfKeyBuf.B)
if ok {
// Fast path - tsids found in the cache
// Fast path - metricIDs found in the cache
qtChild.Done()
return tsids, nil
return metricIDs, nil
}
// Slow path - search for tsids in the db and extDB.
// Slow path - search for metricIDs in the db and extDB.
is := db.getIndexSearch(deadline)
localTSIDs, err := is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
db.putIndexSearch(is)
if err != nil {
return nil, err
return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %s", err)
}
qtChild.Done()
var extTSIDs []TSID
var extMetricIDs []uint64
if db.doExtDB(func(extDB *indexDB) {
qtChild := qt.NewChild("search for tsids in the previous indexdb")
qtChild := qt.NewChild("search for metricIDs in the previous indexdb")
defer qtChild.Done()
tfKeyExtBuf := tagFiltersKeyBufPool.Get()
@ -1824,36 +1798,111 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
tsids, ok := extDB.getFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
metricIDs, ok := extDB.getMetricIDsFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
if ok {
extTSIDs = tsids
extMetricIDs = metricIDs
return
}
is := extDB.getIndexSearch(deadline)
extTSIDs, err = is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
extDB.putIndexSearch(is)
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
extDB.putToTagFiltersCache(qtChild, extTSIDs, tfKeyExtBuf.B)
extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B)
}) {
if err != nil {
return nil, err
return nil, fmt.Errorf("error when searching for metricIDs in the previous indexdb: %s", err)
}
}
// Merge localTSIDs with extTSIDs.
tsids = mergeTSIDs(localTSIDs, extTSIDs)
qt.Printf("merge %d tsids from the current indexdb with %d tsids from the previous indexdb; result: %d tsids", len(localTSIDs), len(extTSIDs), len(tsids))
// Merge localMetricIDs with extMetricIDs.
metricIDs = mergeSortedMetricIDs(localMetricIDs, extMetricIDs)
qt.Printf("merge %d metricIDs from the current indexdb with %d metricIDs from the previous indexdb; result: %d metricIDs",
len(localMetricIDs), len(extMetricIDs), len(metricIDs))
// Store metricIDs in the cache.
db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)
return metricIDs, nil
}
func mergeSortedMetricIDs(a, b []uint64) []uint64 {
if len(b) == 0 {
return a
}
i := 0
j := 0
result := make([]uint64, 0, len(a)+len(b))
for {
next := b[j]
start := i
for i < len(a) && a[i] <= next {
i++
}
result = append(result, a[start:i]...)
if len(result) > 0 {
last := result[len(result)-1]
for j < len(b) && b[j] == last {
j++
}
}
if i == len(a) {
return append(result, b[j:]...)
}
a, b = b, a
i, j = j, i
}
}
func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uint64, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("obtain tsids from %d metricIDs", len(metricIDs))
defer qt.Done()
if len(metricIDs) == 0 {
return nil, nil
}
tsids := make([]TSID, len(metricIDs))
is := db.getIndexSearch(deadline)
defer db.putIndexSearch(is)
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metricIDs", len(tsids), len(metricIDs))
// Sort the found tsids, since they must be passed to TSID search
// in the sorted order.
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
qt.Printf("sort %d tsids", len(tsids))
// Store TSIDs in the cache.
db.putToTagFiltersCache(qt, tsids, tfKeyBuf.B)
return tsids, err
return tsids, nil
}
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
@ -1928,30 +1977,6 @@ func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, er
return dst, nil
}
func mergeTSIDs(a, b []TSID) []TSID {
if len(b) > len(a) {
a, b = b, a
}
if len(b) == 0 {
return a
}
m := make(map[uint64]TSID, len(a))
for i := range a {
tsid := &a[i]
m[tsid.MetricID] = *tsid
}
for i := range b {
tsid := &b[i]
m[tsid.MetricID] = *tsid
}
tsids := make([]TSID, 0, len(m))
for _, tsid := range m {
tsids = append(tsids, tsid)
}
return tsids
}
func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
ts := &is.ts
kb := &is.kb
@ -1980,66 +2005,6 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
return true, nil
}
func (is *indexSearch) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
if len(metricIDs) == 0 {
// Nothing found.
return nil, nil
}
// Obtain TSID values for the given metricIDs.
tsids := make([]TSID, len(metricIDs))
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metric ids", len(tsids), len(metricIDs))
// Do not sort the found tsids, since they will be sorted later.
return tsids, nil
}
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
@ -2292,6 +2257,14 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
}
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err

View file

@ -22,6 +22,77 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
data := marshalMetricIDs(nil, metricIDs)
result, err := unmarshalMetricIDs(nil, data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil)
f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843})
}
func TestMergeSortedMetricIDs(t *testing.T) {
f := func(a, b []uint64) {
t.Helper()
m := make(map[uint64]bool)
var resultExpected []uint64
for _, v := range a {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
for _, v := range b {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
sort.Slice(resultExpected, func(i, j int) bool {
return resultExpected[i] < resultExpected[j]
})
result := mergeSortedMetricIDs(a, b)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", a, b, result, resultExpected)
}
result = mergeSortedMetricIDs(b, a)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", b, a, result, resultExpected)
}
}
f(nil, nil)
f([]uint64{1}, nil)
f(nil, []uint64{23})
f([]uint64{1234}, []uint64{0})
f([]uint64{1}, []uint64{1})
f([]uint64{1}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{2, 3})
f([]uint64{0, 1, 7, 8, 9, 13, 20}, []uint64{1, 2, 7, 13, 15})
f([]uint64{0, 1, 2, 3, 4}, []uint64{5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{4, 5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{3, 4, 5, 6, 7, 8})
f([]uint64{2, 3, 4}, []uint64{1, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 4, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7})
}
func TestReverseBytes(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
@ -415,47 +486,6 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) {
f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2})
}
func TestMarshalUnmarshalTSIDs(t *testing.T) {
f := func(tsids []TSID) {
t.Helper()
value := marshalTSIDs(nil, tsids)
tsidsGot, err := unmarshalTSIDs(nil, value)
if err != nil {
t.Fatalf("cannot unmarshal tsids: %s", err)
}
if len(tsids) == 0 && len(tsidsGot) != 0 || len(tsids) > 0 && !reflect.DeepEqual(tsids, tsidsGot) {
t.Fatalf("unexpected tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot, tsids)
}
// Try marshlaing with prefix
prefix := []byte("prefix")
valueExt := marshalTSIDs(prefix, tsids)
if !bytes.Equal(valueExt[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix after marshaling;\ngot\n%X\nwant\n%X", valueExt[:len(prefix)], prefix)
}
if !bytes.Equal(valueExt[len(prefix):], value) {
t.Fatalf("unexpected prefixed marshaled value;\ngot\n%X\nwant\n%X", valueExt[len(prefix):], value)
}
// Try unmarshaling with prefix
tsidPrefix := []TSID{{MetricID: 123}, {JobID: 456}}
tsidsGot, err = unmarshalTSIDs(tsidPrefix, value)
if err != nil {
t.Fatalf("cannot unmarshal prefixed tsids: %s", err)
}
if !reflect.DeepEqual(tsidsGot[:len(tsidPrefix)], tsidPrefix) {
t.Fatalf("unexpected tsid prefix\ngot\n%+v\nwant\n%+v", tsidsGot[:len(tsidPrefix)], tsidPrefix)
}
if len(tsids) == 0 && len(tsidsGot) != len(tsidPrefix) || len(tsids) > 0 && !reflect.DeepEqual(tsidsGot[len(tsidPrefix):], tsids) {
t.Fatalf("unexpected prefixed tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot[len(tsidPrefix):], tsids)
}
}
f(nil)
f([]TSID{{MetricID: 123}})
f([]TSID{{JobID: 34}, {MetricID: 2343}, {InstanceID: 243321}})
}
func TestIndexDBOpenClose(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
@ -838,7 +868,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, false); err != nil {
return fmt.Errorf("cannot add no-op negative filter: %w", err)
}
tsidsFound, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -847,7 +877,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify tag cache.
tsidsCached, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsCached, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -859,7 +889,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
}
@ -877,7 +907,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, []byte(re), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for Graphite wildcard")
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
}
@ -894,7 +924,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag"), []byte("foo|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search with a filter matching empty tag: %w", err)
}
@ -914,7 +944,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag2"), []byte("bar|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag2: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search with multipel filters matching empty tags: %w", err)
}
@ -942,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, true); err != nil {
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
}
@ -952,7 +982,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
}
@ -968,7 +998,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
}
@ -984,7 +1014,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Search with empty filter. It should match all the results.
tfs.Reset()
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search for common prefix: %w", err)
}
@ -997,7 +1027,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -1014,7 +1044,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs1, tfs2}, tr)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -1023,7 +1053,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify empty tfss
tsidsFound, err = db.searchTSIDs(nil, nil, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, nil, tr)
if err != nil {
return fmt.Errorf("cannot search for nil tfss: %w", err)
}
@ -1035,6 +1065,14 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return nil
}
func searchTSIDsInTest(db *indexDB, tfs []*TagFilters, tr TimeRange) ([]TSID, error) {
metricIDs, err := db.searchMetricIDs(nil, tfs, tr, 1e5, noDeadline)
if err != nil {
return nil, err
}
return db.getTSIDsFromMetricIDs(nil, metricIDs, noDeadline)
}
func testHasTSID(tsids []TSID, tsid *TSID) bool {
for i := range tsids {
if tsids[i] == *tsid {
@ -1753,7 +1791,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now),
}
matchedTSIDs, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline)
matchedTSIDs, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
@ -1807,7 +1845,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MaxTimestamp: int64(now),
}
matchedTSIDs, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline)
matchedTSIDs, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}

View file

@ -93,6 +93,7 @@ type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call.
MetricBlockRef MetricBlockRef
// idb is used for MetricName lookup for the found data blocks.
idb *indexDB
ts tableSearch
@ -143,16 +144,21 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
}
s.reset()
s.idb = storage.idb()
s.tr = tr
s.tfss = tfss
s.deadline = deadline
s.needClosing = true
tsids, err := storage.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
var tsids []TSID
metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err == nil {
err = storage.prefetchMetricNames(qt, tsids, deadline)
tsids, err = s.idb.getTSIDsFromMetricIDs(qt, metricIDs, deadline)
if err == nil {
err = storage.prefetchMetricNames(qt, metricIDs, deadline)
}
}
// It is ok to call Init on error from storage.searchTSIDs.
// It is ok to call Init on non-nil err.
// Init must be called before returning because it will fail
// on Seach.MustClose otherwise.
s.ts.Init(storage.tb, tsids, tr)
@ -161,8 +167,6 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
s.err = err
return 0
}
s.idb = storage.idb()
return len(tsids)
}

View file

@ -50,9 +50,6 @@ type Storage struct {
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -459,11 +456,6 @@ type Metrics struct {
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
SearchTSIDsConcurrencyLimitReached uint64
SearchTSIDsConcurrencyLimitTimeout uint64
SearchTSIDsConcurrencyCapacity uint64
SearchTSIDsConcurrencyCurrent uint64
SearchDelays uint64
SlowRowInserts uint64
@ -541,11 +533,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached)
m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout)
m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh))
m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh))
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
@ -1106,27 +1093,26 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tsids) == 0 {
if len(metricIDs) == 0 {
return nil, nil
}
if err = s.prefetchMetricNames(qt, tsids, deadline); err != nil {
if err = s.prefetchMetricNames(qt, metricIDs, deadline); err != nil {
return nil, err
}
idb := s.idb()
metricNames := make([]string, 0, len(tsids))
metricNamesSeen := make(map[string]struct{}, len(tsids))
metricNames := make([]string, 0, len(metricIDs))
metricNamesSeen := make(map[string]struct{}, len(metricIDs))
var metricName []byte
for i := range tsids {
for i, metricID := range metricIDs {
if i&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(deadline); err != nil {
return nil, err
}
}
metricID := tsids[i].MetricID
var err error
metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID)
if err != nil {
@ -1148,75 +1134,25 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
return metricNames, nil
}
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("search for matching tsids: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call searchTSIDs.
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1)
currentTime := fasttime.UnixTimestamp()
timeoutSecs := uint64(0)
if currentTime < deadline {
timeoutSecs = deadline - currentTime
}
timeout := time.Second * time.Duration(timeoutSecs)
t := timerpool.Get(timeout)
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
qt.Printf("wait in the queue because %d concurrent search requests are already performed", cap(searchTSIDsConcurrencyCh))
timerpool.Put(t)
case <-t.C:
timerpool.Put(t)
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1)
return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load",
cap(searchTSIDsConcurrencyCh), timeout.Seconds())
}
}
tsids, err := s.idb().searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
<-searchTSIDsConcurrencyCh
if err != nil {
return nil, fmt.Errorf("error when searching tsids: %w", err)
}
return tsids, nil
}
var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2)
)
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
// prefetchMetricNames pre-fetches metric names for the given metricIDs into metricID->metricName cache.
//
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error {
qt = qt.NewChild("prefetch metric names for %d tsids", len(tsids))
// This should speed-up further searchMetricNameWithCache calls for srcMetricIDs from tsids.
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uint64, deadline uint64) error {
qt = qt.NewChild("prefetch metric names for %d metricIDs", len(srcMetricIDs))
defer qt.Done()
if len(tsids) == 0 {
if len(srcMetricIDs) == 0 {
qt.Printf("nothing to prefetch")
return nil
}
var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids {
tsid := &tsids[i]
metricID := tsid.MetricID
for _, metricID := range srcMetricIDs {
if prefetchedMetricIDs.Has(metricID) {
continue
}
metricIDs = append(metricIDs, metricID)
}
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(tsids))
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
if len(metricIDs) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))