lib/storage: subsitute searchTSIDs functions with more lightweight searchMetricIDs function

The searchTSIDs function was searching for metricIDs matching the the given tag filters
and then was locating the corresponding TSID entries for the found metricIDs.

The TSID entries aren't needed when searching for time series names (aka MetricName),
so this commit removes the uneeded TSID search from the implementation of /api/v1/series API.
This improves perfromance of /api/v1/series calls.

This commit also improves performance a bit for /api/v1/query and /api/v1/query_range calls,
since now these calls cache small metricIDs instead of big TSID entries
in the indexdb/tagFilters cache (now this cache is named indexdb/tagFiltersToMetricIDs)
without the need to compress the saved entries in order to save cache space.

This commit also removes concurrency limiter during searching for matching time series,
which was introduced in 8f16388428, since the concurrency
for all the read queries is already limited with -search.maxConcurrentRequests command-line flag.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
This commit is contained in:
Aliaksandr Valialkin 2022-10-23 12:15:24 +03:00
parent a10647e0bf
commit 5e4dfe50c6
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
10 changed files with 326 additions and 381 deletions

View file

@ -2309,7 +2309,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -57,10 +57,14 @@ var (
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
)
// CheckTimeRange returns true if the given tr is denied for querying.
@ -100,7 +104,7 @@ func InitWithoutMetrics(resetCacheIfNeeded func(mrs []storage.MetricRow)) {
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.N)
storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N)
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N)
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N)
@ -601,19 +605,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().AddRowsConcurrencyCurrent)
})
metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitReached)
})
metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitTimeout)
})
metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCapacity)
})
metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCurrent)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays)
})
@ -717,8 +708,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSize)
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSize)
})
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize())
@ -758,8 +749,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeBytes)
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSizeBytes)
})
metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSizeBytes())
@ -789,8 +780,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeMaxBytes)
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheSizeMaxBytes)
})
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMaxSizeBytes())
@ -817,8 +808,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheRequests)
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheRequests)
})
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheRequests())
@ -845,8 +836,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 {
return float64(idbm().TagFiltersCacheMisses)
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersToMetricIDsCacheMisses)
})
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMisses())

View file

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
**Update note 1:** the `indexdb/tagFilters` cache type at [/metrics](https://docs.victoriametrics.com/#monitoring) has been renamed to `indexdb/tagFiltersToMetricIDs` in order to make its puropose more clear.
* FEATURE: allow limiting memory usage on a per-query basis with `-search.maxMemoryPerQuery` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): drop all the labels with `__` prefix from discovered targets in the same way as Prometheus does according to [this article](https://www.robustperception.io/life-of-a-label/). Previously the following labels were available during [metric-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs): `__address__`, `__scheme__`, `__metrics_path__`, `__scrape_interval__`, `__scrape_timeout__`, `__param_*`. Now these labels are available only during [target-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). This should reduce CPU usage and memory usage for `vmagent` setups, which scrape big number of targets.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for metric-level [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling), which can be applied via `metric_relabel_configs` section at [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs), via `-remoteWrite.relabelConfig` or via `-remoteWrite.urlRelabelConfig` command-line options.
@ -41,6 +43,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155).
* FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files.
* FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal.
* FEATURE: improve the performance of [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) endpoint by eliminating loading of unused `TSID` data during the API call.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.

View file

@ -1157,7 +1157,7 @@ Below is the output for `/path/to/vmstorage -help`:
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2310,7 +2310,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2313,7 +2313,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"path/filepath"
"reflect"
"sort"
"strconv"
"sync"
@ -97,8 +98,8 @@ type indexDB struct {
extDB *indexDB
extDBLock sync.Mutex
// Cache for fast TagFilters -> TSIDs lookup.
tagFiltersCache *workingsetcache.Cache
// Cache for fast TagFilters -> MetricIDs lookup.
tagFiltersToMetricIDsCache *workingsetcache.Cache
// The parent storage.
s *Storage
@ -110,18 +111,18 @@ type indexDB struct {
indexSearchPool sync.Pool
}
var maxTagFilterCacheSize int
var maxTagFiltersCacheSize int
// SetTagFilterCacheSize overrides the default size of indexdb/tagFilters cache
func SetTagFilterCacheSize(size int) {
maxTagFilterCacheSize = size
// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
func SetTagFiltersCacheSize(size int) {
maxTagFiltersCacheSize = size
}
func getTagFilterCacheSize() int {
if maxTagFilterCacheSize <= 0 {
func getTagFiltersCacheSize() int {
if maxTagFiltersCacheSize <= 0 {
return int(float64(memory.Allowed()) / 32)
}
return maxTagFilterCacheSize
return maxTagFiltersCacheSize
}
// openIndexDB opens index db from the given path.
@ -147,8 +148,9 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
}
// Do not persist tagFiltersCache in files, since it is very volatile.
// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize()
db := &indexDB{
refCount: 1,
@ -157,7 +159,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
tb: tb,
name: name,
tagFiltersCache: workingsetcache.New(getTagFilterCacheSize()),
tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
}
@ -168,11 +170,11 @@ const noDeadline = 1<<64 - 1
// IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct {
TagFiltersCacheSize uint64
TagFiltersCacheSizeBytes uint64
TagFiltersCacheSizeMaxBytes uint64
TagFiltersCacheRequests uint64
TagFiltersCacheMisses uint64
TagFiltersToMetricIDsCacheSize uint64
TagFiltersToMetricIDsCacheSizeBytes uint64
TagFiltersToMetricIDsCacheSizeMaxBytes uint64
TagFiltersToMetricIDsCacheRequests uint64
TagFiltersToMetricIDsCacheMisses uint64
DeletedMetricsCount uint64
@ -210,12 +212,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
var cs fastcache.Stats
cs.Reset()
db.tagFiltersCache.UpdateStats(&cs)
m.TagFiltersCacheSize += cs.EntriesCount
m.TagFiltersCacheSizeBytes += cs.BytesSize
m.TagFiltersCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersCacheRequests += cs.GetCalls
m.TagFiltersCacheMisses += cs.Misses
db.tagFiltersToMetricIDsCache.UpdateStats(&cs)
m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
m.TagFiltersToMetricIDsCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
@ -296,10 +298,10 @@ func (db *indexDB) decRef() {
db.SetExtDB(nil)
// Free space occupied by caches owned by db.
db.tagFiltersCache.Stop()
db.tagFiltersToMetricIDsCache.Stop()
db.loopsPerDateTagFilterCache.Stop()
db.tagFiltersCache = nil
db.tagFiltersToMetricIDsCache = nil
db.s = nil
db.loopsPerDateTagFilterCache = nil
@ -312,74 +314,36 @@ func (db *indexDB) decRef() {
logger.Infof("indexDB %q has been dropped", tbPath)
}
func (db *indexDB) getFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]TSID, bool) {
qt = qt.NewChild("search for tsids in tag filters cache")
var tagBufPool bytesutil.ByteBufferPool
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
qt = qt.NewChild("search for metricIDs in tag filters cache")
defer qt.Done()
compressedBuf := tagBufPool.Get()
defer tagBufPool.Put(compressedBuf)
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key)
if len(compressedBuf.B) == 0 {
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
if len(buf.B) == 0 {
qt.Printf("cache miss")
return nil, false
}
if compressedBuf.B[0] == 0 {
// Fast path - tsids are stored in uncompressed form.
qt.Printf("found tsids with size: %d bytes", len(compressedBuf.B))
tsids, err := unmarshalTSIDs(nil, compressedBuf.B[1:])
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
}
// Slow path - tsids are stored in compressed form.
qt.Printf("found tsids with compressed size: %d bytes", len(compressedBuf.B))
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
var err error
buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B[1:])
qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
metricIDs, err := unmarshalMetricIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err)
logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
}
qt.Printf("decompressed tsids to %d bytes", len(buf.B))
tsids, err := unmarshalTSIDs(nil, buf.B)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
return metricIDs, true
}
var tagBufPool bytesutil.ByteBufferPool
func (db *indexDB) putToTagFiltersCache(qt *querytracer.Tracer, tsids []TSID, key []byte) {
qt = qt.NewChild("put %d tsids in cache", len(tsids))
func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
defer qt.Done()
if len(tsids) <= 2 {
// Fast path - store small number of tsids in uncompressed form.
// This saves CPU time on compress / decompress.
buf := tagBufPool.Get()
buf.B = append(buf.B[:0], 0)
buf.B = marshalTSIDs(buf.B, tsids)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
db.tagFiltersCache.SetBig(key, buf.B)
qt.Printf("store %d tsids into cache", len(tsids))
tagBufPool.Put(buf)
return
}
// Slower path - store big number of tsids in compressed form.
// This increases cache capacity.
buf := tagBufPool.Get()
buf.B = marshalTSIDs(buf.B[:0], tsids)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
compressedBuf := tagBufPool.Get()
compressedBuf.B = append(compressedBuf.B[:0], 1)
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B, buf.B, 1)
qt.Printf("compressed %d tsids into %d bytes", len(tsids), len(compressedBuf.B))
buf.B = marshalMetricIDs(buf.B, metricIDs)
qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
qt.Printf("stored %d metricIDs into cache", len(metricIDs))
tagBufPool.Put(buf)
db.tagFiltersCache.SetBig(key, compressedBuf.B)
qt.Printf("stored %d compressed tsids into cache", len(tsids))
tagBufPool.Put(compressedBuf)
}
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
@ -475,35 +439,44 @@ func invalidateTagFiltersCache() {
var tagFiltersKeyGen uint64
func marshalTSIDs(dst []byte, tsids []TSID) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(tsids)))
for i := range tsids {
dst = tsids[i].Marshal(dst)
func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
if len(metricIDs) == 0 {
return dst
}
var buf []byte
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
sh.Cap = sh.Len
sh.Len = 8 * len(metricIDs)
dst = append(dst, buf...)
return dst
}
func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) {
func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
if len(src)%8 != 0 {
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
}
if len(src) < 8 {
return dst, fmt.Errorf("cannot unmarshal the number of tsids from %d bytes; require at least %d bytes", len(src), 8)
return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
}
n := encoding.UnmarshalUint64(src)
if n > ((1<<64)-1)/8 {
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, ((1<<64)-1)/8)
}
src = src[8:]
dstLen := len(dst)
if nn := dstLen + int(n) - cap(dst); nn > 0 {
dst = append(dst[:cap(dst)], make([]TSID, nn)...)
if n*8 != uint64(len(src)) {
return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
}
dst = dst[:dstLen+int(n)]
for i := 0; i < int(n); i++ {
tail, err := dst[dstLen+i].Unmarshal(src)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal tsid #%d out of %d: %w", i, n, err)
}
src = tail
}
if len(src) > 0 {
return dst, fmt.Errorf("non-zero tail left after unmarshaling %d tsids; len(tail)=%d", n, len(src))
if n == 0 {
return dst, nil
}
var metricIDs []uint64
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
sh.Data = uintptr(unsafe.Pointer(&src[0]))
sh.Cap = sh.Len
sh.Len = len(src) / 8
dst = append(dst, metricIDs...)
return dst, nil
}
@ -1783,8 +1756,10 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
return dmis, nil
}
// searchTSIDs returns sorted tsids matching the given tfss over the given tr.
func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
if len(tfss) == 0 {
return nil, nil
}
@ -1792,31 +1767,30 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
tfss = convertToCompositeTagFilterss(tfss)
}
qtChild := qt.NewChild("search for tsids in the current indexdb")
qtChild := qt.NewChild("search for metricIDs in the current indexdb")
tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
tsids, ok := db.getFromTagFiltersCache(qtChild, tfKeyBuf.B)
metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qtChild, tfKeyBuf.B)
if ok {
// Fast path - tsids found in the cache
// Fast path - metricIDs found in the cache
qtChild.Done()
return tsids, nil
return metricIDs, nil
}
// Slow path - search for tsids in the db and extDB.
// Slow path - search for metricIDs in the db and extDB.
is := db.getIndexSearch(deadline)
localTSIDs, err := is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
db.putIndexSearch(is)
if err != nil {
return nil, err
return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %s", err)
}
qtChild.Done()
var extTSIDs []TSID
var extMetricIDs []uint64
if db.doExtDB(func(extDB *indexDB) {
qtChild := qt.NewChild("search for tsids in the previous indexdb")
qtChild := qt.NewChild("search for metricIDs in the previous indexdb")
defer qtChild.Done()
tfKeyExtBuf := tagFiltersKeyBufPool.Get()
@ -1824,36 +1798,111 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
// Data in extDB cannot be changed, so use unversioned keys for tag cache.
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
tsids, ok := extDB.getFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
metricIDs, ok := extDB.getMetricIDsFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
if ok {
extTSIDs = tsids
extMetricIDs = metricIDs
return
}
is := extDB.getIndexSearch(deadline)
extTSIDs, err = is.searchTSIDs(qtChild, tfss, tr, maxMetrics)
extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
extDB.putIndexSearch(is)
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
extDB.putToTagFiltersCache(qtChild, extTSIDs, tfKeyExtBuf.B)
extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B)
}) {
if err != nil {
return nil, err
return nil, fmt.Errorf("error when searching for metricIDs in the previous indexdb: %s", err)
}
}
// Merge localTSIDs with extTSIDs.
tsids = mergeTSIDs(localTSIDs, extTSIDs)
qt.Printf("merge %d tsids from the current indexdb with %d tsids from the previous indexdb; result: %d tsids", len(localTSIDs), len(extTSIDs), len(tsids))
// Merge localMetricIDs with extMetricIDs.
metricIDs = mergeSortedMetricIDs(localMetricIDs, extMetricIDs)
qt.Printf("merge %d metricIDs from the current indexdb with %d metricIDs from the previous indexdb; result: %d metricIDs",
len(localMetricIDs), len(extMetricIDs), len(metricIDs))
// Store metricIDs in the cache.
db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)
return metricIDs, nil
}
func mergeSortedMetricIDs(a, b []uint64) []uint64 {
if len(b) == 0 {
return a
}
i := 0
j := 0
result := make([]uint64, 0, len(a)+len(b))
for {
next := b[j]
start := i
for i < len(a) && a[i] <= next {
i++
}
result = append(result, a[start:i]...)
if len(result) > 0 {
last := result[len(result)-1]
for j < len(b) && b[j] == last {
j++
}
}
if i == len(a) {
return append(result, b[j:]...)
}
a, b = b, a
i, j = j, i
}
}
func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uint64, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("obtain tsids from %d metricIDs", len(metricIDs))
defer qt.Done()
if len(metricIDs) == 0 {
return nil, nil
}
tsids := make([]TSID, len(metricIDs))
is := db.getIndexSearch(deadline)
defer db.putIndexSearch(is)
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metricIDs", len(tsids), len(metricIDs))
// Sort the found tsids, since they must be passed to TSID search
// in the sorted order.
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
qt.Printf("sort %d tsids", len(tsids))
// Store TSIDs in the cache.
db.putToTagFiltersCache(qt, tsids, tfKeyBuf.B)
return tsids, err
return tsids, nil
}
var tagFiltersKeyBufPool bytesutil.ByteBufferPool
@ -1928,30 +1977,6 @@ func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, er
return dst, nil
}
func mergeTSIDs(a, b []TSID) []TSID {
if len(b) > len(a) {
a, b = b, a
}
if len(b) == 0 {
return a
}
m := make(map[uint64]TSID, len(a))
for i := range a {
tsid := &a[i]
m[tsid.MetricID] = *tsid
}
for i := range b {
tsid := &b[i]
m[tsid.MetricID] = *tsid
}
tsids := make([]TSID, 0, len(m))
for _, tsid := range m {
tsids = append(tsids, tsid)
}
return tsids
}
func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
ts := &is.ts
kb := &is.kb
@ -1980,66 +2005,6 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
return true, nil
}
func (is *indexSearch) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
if len(metricIDs) == 0 {
// Nothing found.
return nil, nil
}
// Obtain TSID values for the given metricIDs.
tsids := make([]TSID, len(metricIDs))
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metric ids", len(tsids), len(metricIDs))
// Do not sort the found tsids, since they will be sorted later.
return tsids, nil
}
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
// There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller.
@ -2292,6 +2257,14 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
}
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err

View file

@ -22,6 +22,77 @@ import (
"github.com/VictoriaMetrics/fastcache"
)
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
data := marshalMetricIDs(nil, metricIDs)
result, err := unmarshalMetricIDs(nil, data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil)
f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843})
}
func TestMergeSortedMetricIDs(t *testing.T) {
f := func(a, b []uint64) {
t.Helper()
m := make(map[uint64]bool)
var resultExpected []uint64
for _, v := range a {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
for _, v := range b {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
sort.Slice(resultExpected, func(i, j int) bool {
return resultExpected[i] < resultExpected[j]
})
result := mergeSortedMetricIDs(a, b)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", a, b, result, resultExpected)
}
result = mergeSortedMetricIDs(b, a)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", b, a, result, resultExpected)
}
}
f(nil, nil)
f([]uint64{1}, nil)
f(nil, []uint64{23})
f([]uint64{1234}, []uint64{0})
f([]uint64{1}, []uint64{1})
f([]uint64{1}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{2, 3})
f([]uint64{0, 1, 7, 8, 9, 13, 20}, []uint64{1, 2, 7, 13, 15})
f([]uint64{0, 1, 2, 3, 4}, []uint64{5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{4, 5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{3, 4, 5, 6, 7, 8})
f([]uint64{2, 3, 4}, []uint64{1, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 4, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7})
}
func TestReverseBytes(t *testing.T) {
f := func(s, resultExpected string) {
t.Helper()
@ -415,47 +486,6 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) {
f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2})
}
func TestMarshalUnmarshalTSIDs(t *testing.T) {
f := func(tsids []TSID) {
t.Helper()
value := marshalTSIDs(nil, tsids)
tsidsGot, err := unmarshalTSIDs(nil, value)
if err != nil {
t.Fatalf("cannot unmarshal tsids: %s", err)
}
if len(tsids) == 0 && len(tsidsGot) != 0 || len(tsids) > 0 && !reflect.DeepEqual(tsids, tsidsGot) {
t.Fatalf("unexpected tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot, tsids)
}
// Try marshlaing with prefix
prefix := []byte("prefix")
valueExt := marshalTSIDs(prefix, tsids)
if !bytes.Equal(valueExt[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix after marshaling;\ngot\n%X\nwant\n%X", valueExt[:len(prefix)], prefix)
}
if !bytes.Equal(valueExt[len(prefix):], value) {
t.Fatalf("unexpected prefixed marshaled value;\ngot\n%X\nwant\n%X", valueExt[len(prefix):], value)
}
// Try unmarshaling with prefix
tsidPrefix := []TSID{{MetricID: 123}, {JobID: 456}}
tsidsGot, err = unmarshalTSIDs(tsidPrefix, value)
if err != nil {
t.Fatalf("cannot unmarshal prefixed tsids: %s", err)
}
if !reflect.DeepEqual(tsidsGot[:len(tsidPrefix)], tsidPrefix) {
t.Fatalf("unexpected tsid prefix\ngot\n%+v\nwant\n%+v", tsidsGot[:len(tsidPrefix)], tsidPrefix)
}
if len(tsids) == 0 && len(tsidsGot) != len(tsidPrefix) || len(tsids) > 0 && !reflect.DeepEqual(tsidsGot[len(tsidPrefix):], tsids) {
t.Fatalf("unexpected prefixed tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot[len(tsidPrefix):], tsids)
}
}
f(nil)
f([]TSID{{MetricID: 123}})
f([]TSID{{JobID: 34}, {MetricID: 2343}, {InstanceID: 243321}})
}
func TestIndexDBOpenClose(t *testing.T) {
s := newTestStorage()
defer stopTestStorage(s)
@ -838,7 +868,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, false); err != nil {
return fmt.Errorf("cannot add no-op negative filter: %w", err)
}
tsidsFound, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -847,7 +877,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify tag cache.
tsidsCached, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsCached, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err)
}
@ -859,7 +889,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
}
@ -877,7 +907,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, []byte(re), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for Graphite wildcard")
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
}
@ -894,7 +924,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag"), []byte("foo|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search with a filter matching empty tag: %w", err)
}
@ -914,7 +944,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag2"), []byte("bar|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag2: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search with multipel filters matching empty tags: %w", err)
}
@ -942,7 +972,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, true); err != nil {
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter: %w", err)
}
@ -952,7 +982,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
}
@ -968,7 +998,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
}
@ -984,7 +1014,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
// Search with empty filter. It should match all the results.
tfs.Reset()
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search for common prefix: %w", err)
}
@ -997,7 +1027,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -1014,7 +1044,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
}
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs1, tfs2}, tr)
if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err)
}
@ -1023,7 +1053,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
}
// Verify empty tfss
tsidsFound, err = db.searchTSIDs(nil, nil, tr, 1e5, noDeadline)
tsidsFound, err = searchTSIDsInTest(db, nil, tr)
if err != nil {
return fmt.Errorf("cannot search for nil tfss: %w", err)
}
@ -1035,6 +1065,14 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return nil
}
func searchTSIDsInTest(db *indexDB, tfs []*TagFilters, tr TimeRange) ([]TSID, error) {
metricIDs, err := db.searchMetricIDs(nil, tfs, tr, 1e5, noDeadline)
if err != nil {
return nil, err
}
return db.getTSIDsFromMetricIDs(nil, metricIDs, noDeadline)
}
func testHasTSID(tsids []TSID, tsid *TSID) bool {
for i := range tsids {
if tsids[i] == *tsid {
@ -1753,7 +1791,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now),
}
matchedTSIDs, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline)
matchedTSIDs, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}
@ -1807,7 +1845,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MaxTimestamp: int64(now),
}
matchedTSIDs, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline)
matchedTSIDs, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil {
t.Fatalf("error searching tsids: %v", err)
}

View file

@ -93,6 +93,7 @@ type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call.
MetricBlockRef MetricBlockRef
// idb is used for MetricName lookup for the found data blocks.
idb *indexDB
ts tableSearch
@ -143,16 +144,21 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
}
s.reset()
s.idb = storage.idb()
s.tr = tr
s.tfss = tfss
s.deadline = deadline
s.needClosing = true
tsids, err := storage.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
var tsids []TSID
metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err == nil {
err = storage.prefetchMetricNames(qt, tsids, deadline)
tsids, err = s.idb.getTSIDsFromMetricIDs(qt, metricIDs, deadline)
if err == nil {
err = storage.prefetchMetricNames(qt, metricIDs, deadline)
}
}
// It is ok to call Init on error from storage.searchTSIDs.
// It is ok to call Init on non-nil err.
// Init must be called before returning because it will fail
// on Seach.MustClose otherwise.
s.ts.Init(storage.tb, tsids, tr)
@ -161,8 +167,6 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
s.err = err
return 0
}
s.idb = storage.idb()
return len(tsids)
}

View file

@ -50,9 +50,6 @@ type Storage struct {
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
slowRowInserts uint64
slowPerDayIndexInserts uint64
slowMetricNameLoads uint64
@ -459,11 +456,6 @@ type Metrics struct {
AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64
SearchTSIDsConcurrencyLimitReached uint64
SearchTSIDsConcurrencyLimitTimeout uint64
SearchTSIDsConcurrencyCapacity uint64
SearchTSIDsConcurrencyCurrent uint64
SearchDelays uint64
SlowRowInserts uint64
@ -541,11 +533,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached)
m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout)
m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh))
m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh))
m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
@ -1106,27 +1093,26 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err != nil {
return nil, err
}
if len(tsids) == 0 {
if len(metricIDs) == 0 {
return nil, nil
}
if err = s.prefetchMetricNames(qt, tsids, deadline); err != nil {
if err = s.prefetchMetricNames(qt, metricIDs, deadline); err != nil {
return nil, err
}
idb := s.idb()
metricNames := make([]string, 0, len(tsids))
metricNamesSeen := make(map[string]struct{}, len(tsids))
metricNames := make([]string, 0, len(metricIDs))
metricNamesSeen := make(map[string]struct{}, len(metricIDs))
var metricName []byte
for i := range tsids {
for i, metricID := range metricIDs {
if i&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(deadline); err != nil {
return nil, err
}
}
metricID := tsids[i].MetricID
var err error
metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID)
if err != nil {
@ -1148,75 +1134,25 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
return metricNames, nil
}
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("search for matching tsids: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call searchTSIDs.
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1)
currentTime := fasttime.UnixTimestamp()
timeoutSecs := uint64(0)
if currentTime < deadline {
timeoutSecs = deadline - currentTime
}
timeout := time.Second * time.Duration(timeoutSecs)
t := timerpool.Get(timeout)
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
qt.Printf("wait in the queue because %d concurrent search requests are already performed", cap(searchTSIDsConcurrencyCh))
timerpool.Put(t)
case <-t.C:
timerpool.Put(t)
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1)
return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load",
cap(searchTSIDsConcurrencyCh), timeout.Seconds())
}
}
tsids, err := s.idb().searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
<-searchTSIDsConcurrencyCh
if err != nil {
return nil, fmt.Errorf("error when searching tsids: %w", err)
}
return tsids, nil
}
var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2)
)
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
// prefetchMetricNames pre-fetches metric names for the given metricIDs into metricID->metricName cache.
//
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids.
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error {
qt = qt.NewChild("prefetch metric names for %d tsids", len(tsids))
// This should speed-up further searchMetricNameWithCache calls for srcMetricIDs from tsids.
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, srcMetricIDs []uint64, deadline uint64) error {
qt = qt.NewChild("prefetch metric names for %d metricIDs", len(srcMetricIDs))
defer qt.Done()
if len(tsids) == 0 {
if len(srcMetricIDs) == 0 {
qt.Printf("nothing to prefetch")
return nil
}
var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids {
tsid := &tsids[i]
metricID := tsid.MetricID
for _, metricID := range srcMetricIDs {
if prefetchedMetricIDs.Has(metricID) {
continue
}
metricIDs = append(metricIDs, metricID)
}
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(tsids))
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
if len(metricIDs) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline.
qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))