lib/storage: subsitute searchTSIDs functions with more lightweight searchMetricIDs function

The searchTSIDs function was searching for metricIDs matching the the given tag filters
and then was locating the corresponding TSID entries for the found metricIDs.

The TSID entries aren't needed when searching for time series names (aka MetricName),
so this commit removes the uneeded TSID search from the implementation of /api/v1/series API.
This improves perfromance of /api/v1/series calls.

This commit also improves performance a bit for /api/v1/query and /api/v1/query_range calls,
since now these calls cache small metricIDs instead of big TSID entries
in the indexdb/tagFilters cache (now this cache is named indexdb/tagFiltersToMetricIDs)
without the need to compress the saved entries in order to save cache space.

This commit also removes concurrency limiter during searching for matching time series,
which was introduced in 8f16388428, since the concurrency
for all the read queries is already limited with -search.maxConcurrentRequests command-line flag.

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/648
This commit is contained in:
Aliaksandr Valialkin 2022-10-23 12:15:24 +03:00
parent c92aef39b5
commit 2dd93449d8
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
10 changed files with 341 additions and 391 deletions

View file

@ -1153,7 +1153,7 @@ Below is the output for `/path/to/vmstorage -help`:
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size -storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size -storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -57,10 +57,14 @@ var (
minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data") minFreeDiskSpaceBytes = flagutil.NewBytes("storage.minFreeDiskSpaceBytes", 10e6, "The minimum free disk space at -storageDataPath after which the storage stops accepting new data")
cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") cacheSizeStorageTSID = flagutil.NewBytes("storage.cacheSizeStorageTSID", 0, "Overrides max size for storage/tsid cache. "+
cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") cacheSizeIndexDBIndexBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBIndexBlocks", 0, "Overrides max size for indexdb/indexBlocks cache. "+
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning") "See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBDataBlocks = flagutil.NewBytes("storage.cacheSizeIndexDBDataBlocks", 0, "Overrides max size for indexdb/dataBlocks cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
cacheSizeIndexDBTagFilters = flagutil.NewBytes("storage.cacheSizeIndexDBTagFilters", 0, "Overrides max size for indexdb/tagFiltersToMetricIDs cache. "+
"See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning")
) )
func main() { func main() {
@ -80,7 +84,7 @@ func main() {
storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset) storage.SetRetentionTimezoneOffset(*retentionTimezoneOffset)
storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N) storage.SetFreeDiskSpaceLimit(minFreeDiskSpaceBytes.N)
storage.SetTSIDCacheSize(cacheSizeStorageTSID.N) storage.SetTSIDCacheSize(cacheSizeStorageTSID.N)
storage.SetTagFilterCacheSize(cacheSizeIndexDBTagFilters.N) storage.SetTagFiltersCacheSize(cacheSizeIndexDBTagFilters.N)
mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N) mergeset.SetIndexBlocksCacheSize(cacheSizeIndexDBIndexBlocks.N)
mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N) mergeset.SetDataBlocksCacheSize(cacheSizeIndexDBDataBlocks.N)
@ -510,19 +514,6 @@ func registerStorageMetrics(strg *storage.Storage) {
return float64(m().AddRowsConcurrencyCurrent) return float64(m().AddRowsConcurrencyCurrent)
}) })
metrics.NewGauge(`vm_concurrent_search_tsids_limit_reached_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitReached)
})
metrics.NewGauge(`vm_concurrent_search_tsids_limit_timeout_total`, func() float64 {
return float64(m().SearchTSIDsConcurrencyLimitTimeout)
})
metrics.NewGauge(`vm_concurrent_search_tsids_capacity`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCapacity)
})
metrics.NewGauge(`vm_concurrent_search_tsids_current`, func() float64 {
return float64(m().SearchTSIDsConcurrencyCurrent)
})
metrics.NewGauge(`vm_search_delays_total`, func() float64 { metrics.NewGauge(`vm_search_delays_total`, func() float64 {
return float64(m().SearchDelays) return float64(m().SearchDelays)
}) })
@ -626,8 +617,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_entries{type="indexdb/indexBlocks"}`, func() float64 { metrics.NewGauge(`vm_cache_entries{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSize) return float64(idbm().IndexBlocksCacheSize)
}) })
metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_entries{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersCacheSize) return float64(idbm().TagFiltersToMetricIDsCacheSize)
}) })
metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_entries{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSize()) return float64(storage.RegexpCacheSize())
@ -667,8 +658,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="storage/next_day_metric_ids"}`, func() float64 {
return float64(m().NextDayMetricIDCacheSizeBytes) return float64(m().NextDayMetricIDCacheSizeBytes)
}) })
metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeBytes) return float64(idbm().TagFiltersToMetricIDsCacheSizeBytes)
}) })
metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_size_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheSizeBytes()) return float64(storage.RegexpCacheSizeBytes())
@ -698,8 +689,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 { metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheSizeMaxBytes) return float64(idbm().IndexBlocksCacheSizeMaxBytes)
}) })
metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_size_max_bytes{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersCacheSizeMaxBytes) return float64(idbm().TagFiltersToMetricIDsCacheSizeMaxBytes)
}) })
metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_size_max_bytes{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMaxSizeBytes()) return float64(storage.RegexpCacheMaxSizeBytes())
@ -726,8 +717,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/indexBlocks"}`, func() float64 { metrics.NewGauge(`vm_cache_requests_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheRequests) return float64(idbm().IndexBlocksCacheRequests)
}) })
metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_requests_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersCacheRequests) return float64(idbm().TagFiltersToMetricIDsCacheRequests)
}) })
metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_requests_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheRequests()) return float64(storage.RegexpCacheRequests())
@ -754,8 +745,8 @@ func registerStorageMetrics(strg *storage.Storage) {
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/indexBlocks"}`, func() float64 { metrics.NewGauge(`vm_cache_misses_total{type="indexdb/indexBlocks"}`, func() float64 {
return float64(idbm().IndexBlocksCacheMisses) return float64(idbm().IndexBlocksCacheMisses)
}) })
metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFilters"}`, func() float64 { metrics.NewGauge(`vm_cache_misses_total{type="indexdb/tagFiltersToMetricIDs"}`, func() float64 {
return float64(idbm().TagFiltersCacheMisses) return float64(idbm().TagFiltersToMetricIDsCacheMisses)
}) })
metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 { metrics.NewGauge(`vm_cache_misses_total{type="storage/regexps"}`, func() float64 {
return float64(storage.RegexpCacheMisses()) return float64(storage.RegexpCacheMisses())

View file

@ -15,6 +15,8 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip ## tip
**Update note 1:** the `indexdb/tagFilters` cache type at [/metrics](https://docs.victoriametrics.com/#monitoring) has been renamed to `indexdb/tagFiltersToMetricIDs` in order to make its puropose more clear.
* FEATURE: allow limiting memory usage on a per-query basis with `-search.maxMemoryPerQuery` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203). * FEATURE: allow limiting memory usage on a per-query basis with `-search.maxMemoryPerQuery` command-line flag. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3203).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): drop all the labels with `__` prefix from discovered targets in the same way as Prometheus does according to [this article](https://www.robustperception.io/life-of-a-label/). Previously the following labels were available during [metric-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs): `__address__`, `__scheme__`, `__metrics_path__`, `__scrape_interval__`, `__scrape_timeout__`, `__param_*`. Now these labels are available only during [target-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). This should reduce CPU usage and memory usage for `vmagent` setups, which scrape big number of targets. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): drop all the labels with `__` prefix from discovered targets in the same way as Prometheus does according to [this article](https://www.robustperception.io/life-of-a-label/). Previously the following labels were available during [metric-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#metric_relabel_configs): `__address__`, `__scheme__`, `__metrics_path__`, `__scrape_interval__`, `__scrape_timeout__`, `__param_*`. Now these labels are available only during [target-level relabeling](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#relabel_config). This should reduce CPU usage and memory usage for `vmagent` setups, which scrape big number of targets.
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for metric-level [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling), which can be applied via `metric_relabel_configs` section at [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs), via `-remoteWrite.relabelConfig` or via `-remoteWrite.urlRelabelConfig` command-line options. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve the performance for metric-level [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling), which can be applied via `metric_relabel_configs` section at [scrape_configs](https://docs.victoriametrics.com/sd_configs.html#scrape_configs), via `-remoteWrite.relabelConfig` or via `-remoteWrite.urlRelabelConfig` command-line options.
@ -41,6 +43,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155). * FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): limit the number of plotted series. This should prevent from browser crashes or hangs when the query returns big number of time series. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3155).
* FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files. * FEATURE: log error if some environment variables referred at `-promscrape.config` via `%{ENV_VAR}` aren't found. This should prevent from silent using incorrect config files.
* FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal. * FEATURE: immediately shut down VictoriaMetrics apps on the second SIGINT or SIGTERM signal if they couldn't be finished gracefully for some reason after receiving the first signal.
* FEATURE: improve the performance of [/api/v1/series](https://docs.victoriametrics.com/url-examples.html#apiv1series) endpoint by eliminating loading of unused `TSID` data during the API call.
* BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225). * BUGFIX: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): properly merge buckets with identical `le` values, but with different string representation of these values when calculating [histogram_quantile](https://docs.victoriametrics.com/MetricsQL.html#histogram_quantile) and [histogram_share](https://docs.victoriametrics.com/MetricsQL.html#histogram_share). For example, `http_request_duration_seconds_bucket{le="5"}` and `http_requests_duration_seconds_bucket{le="5.0"}`. Such buckets may be returned from distinct targets. Thanks to @647-coder for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3225).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level. * BUGFIX: [vmalert](https://docs.victoriametrics.com/vmalert.html): change severity level for log messages about failed attempts for sending data to remote storage from `error` to `warn`. The message for about all failed send attempts remains at `error` severity level.

View file

@ -1157,7 +1157,7 @@ Below is the output for `/path/to/vmstorage -help`:
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size -storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size -storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2310,7 +2310,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size -storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size -storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -2313,7 +2313,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/indexBlocks cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeIndexDBTagFilters size -storage.cacheSizeIndexDBTagFilters size
Overrides max size for indexdb/tagFilters cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for indexdb/tagFiltersToMetricIDs cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning
Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0) Supports the following optional suffixes for size values: KB, MB, GB, KiB, MiB, GiB (default 0)
-storage.cacheSizeStorageTSID size -storage.cacheSizeStorageTSID size
Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning Overrides max size for storage/tsid cache. See https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#cache-tuning

View file

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"path/filepath" "path/filepath"
"reflect"
"sort" "sort"
"strconv" "strconv"
"sync" "sync"
@ -97,8 +98,8 @@ type indexDB struct {
extDB *indexDB extDB *indexDB
extDBLock sync.Mutex extDBLock sync.Mutex
// Cache for fast TagFilters -> TSIDs lookup. // Cache for fast TagFilters -> MetricIDs lookup.
tagFiltersCache *workingsetcache.Cache tagFiltersToMetricIDsCache *workingsetcache.Cache
// The parent storage. // The parent storage.
s *Storage s *Storage
@ -110,18 +111,18 @@ type indexDB struct {
indexSearchPool sync.Pool indexSearchPool sync.Pool
} }
var maxTagFilterCacheSize int var maxTagFiltersCacheSize int
// SetTagFilterCacheSize overrides the default size of indexdb/tagFilters cache // SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
func SetTagFilterCacheSize(size int) { func SetTagFiltersCacheSize(size int) {
maxTagFilterCacheSize = size maxTagFiltersCacheSize = size
} }
func getTagFilterCacheSize() int { func getTagFiltersCacheSize() int {
if maxTagFilterCacheSize <= 0 { if maxTagFiltersCacheSize <= 0 {
return int(float64(memory.Allowed()) / 32) return int(float64(memory.Allowed()) / 32)
} }
return maxTagFilterCacheSize return maxTagFiltersCacheSize
} }
// openIndexDB opens index db from the given path. // openIndexDB opens index db from the given path.
@ -147,8 +148,9 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err) return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)
} }
// Do not persist tagFiltersCache in files, since it is very volatile. // Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
mem := memory.Allowed() mem := memory.Allowed()
tagFiltersCacheSize := getTagFiltersCacheSize()
db := &indexDB{ db := &indexDB{
refCount: 1, refCount: 1,
@ -157,7 +159,7 @@ func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *
tb: tb, tb: tb,
name: name, name: name,
tagFiltersCache: workingsetcache.New(getTagFilterCacheSize()), tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
s: s, s: s,
loopsPerDateTagFilterCache: workingsetcache.New(mem / 128), loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
} }
@ -168,11 +170,11 @@ const noDeadline = 1<<64 - 1
// IndexDBMetrics contains essential metrics for indexDB. // IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct { type IndexDBMetrics struct {
TagFiltersCacheSize uint64 TagFiltersToMetricIDsCacheSize uint64
TagFiltersCacheSizeBytes uint64 TagFiltersToMetricIDsCacheSizeBytes uint64
TagFiltersCacheSizeMaxBytes uint64 TagFiltersToMetricIDsCacheSizeMaxBytes uint64
TagFiltersCacheRequests uint64 TagFiltersToMetricIDsCacheRequests uint64
TagFiltersCacheMisses uint64 TagFiltersToMetricIDsCacheMisses uint64
DeletedMetricsCount uint64 DeletedMetricsCount uint64
@ -210,12 +212,12 @@ func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
var cs fastcache.Stats var cs fastcache.Stats
cs.Reset() cs.Reset()
db.tagFiltersCache.UpdateStats(&cs) db.tagFiltersToMetricIDsCache.UpdateStats(&cs)
m.TagFiltersCacheSize += cs.EntriesCount m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
m.TagFiltersCacheSizeBytes += cs.BytesSize m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
m.TagFiltersCacheSizeMaxBytes += cs.MaxBytesSize m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
m.TagFiltersCacheRequests += cs.GetCalls m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
m.TagFiltersCacheMisses += cs.Misses m.TagFiltersToMetricIDsCacheMisses += cs.Misses
m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len()) m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())
@ -296,10 +298,10 @@ func (db *indexDB) decRef() {
db.SetExtDB(nil) db.SetExtDB(nil)
// Free space occupied by caches owned by db. // Free space occupied by caches owned by db.
db.tagFiltersCache.Stop() db.tagFiltersToMetricIDsCache.Stop()
db.loopsPerDateTagFilterCache.Stop() db.loopsPerDateTagFilterCache.Stop()
db.tagFiltersCache = nil db.tagFiltersToMetricIDsCache = nil
db.s = nil db.s = nil
db.loopsPerDateTagFilterCache = nil db.loopsPerDateTagFilterCache = nil
@ -312,74 +314,36 @@ func (db *indexDB) decRef() {
logger.Infof("indexDB %q has been dropped", tbPath) logger.Infof("indexDB %q has been dropped", tbPath)
} }
func (db *indexDB) getFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]TSID, bool) { var tagBufPool bytesutil.ByteBufferPool
qt = qt.NewChild("search for tsids in tag filters cache")
func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
qt = qt.NewChild("search for metricIDs in tag filters cache")
defer qt.Done() defer qt.Done()
compressedBuf := tagBufPool.Get() buf := tagBufPool.Get()
defer tagBufPool.Put(compressedBuf) defer tagBufPool.Put(buf)
compressedBuf.B = db.tagFiltersCache.GetBig(compressedBuf.B[:0], key) buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
if len(compressedBuf.B) == 0 { if len(buf.B) == 0 {
qt.Printf("cache miss") qt.Printf("cache miss")
return nil, false return nil, false
} }
if compressedBuf.B[0] == 0 { qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
// Fast path - tsids are stored in uncompressed form. metricIDs, err := unmarshalMetricIDs(nil, buf.B)
qt.Printf("found tsids with size: %d bytes", len(compressedBuf.B))
tsids, err := unmarshalTSIDs(nil, compressedBuf.B[1:])
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
}
// Slow path - tsids are stored in compressed form.
qt.Printf("found tsids with compressed size: %d bytes", len(compressedBuf.B))
buf := tagBufPool.Get()
defer tagBufPool.Put(buf)
var err error
buf.B, err = encoding.DecompressZSTD(buf.B[:0], compressedBuf.B[1:])
if err != nil { if err != nil {
logger.Panicf("FATAL: cannot decompress tsids from tagFiltersCache: %s", err) logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
} }
qt.Printf("decompressed tsids to %d bytes", len(buf.B)) qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
tsids, err := unmarshalTSIDs(nil, buf.B) return metricIDs, true
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tsids from tagFiltersCache: %s", err)
}
qt.Printf("unmarshaled %d tsids", len(tsids))
return tsids, true
} }
var tagBufPool bytesutil.ByteBufferPool func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
func (db *indexDB) putToTagFiltersCache(qt *querytracer.Tracer, tsids []TSID, key []byte) {
qt = qt.NewChild("put %d tsids in cache", len(tsids))
defer qt.Done() defer qt.Done()
if len(tsids) <= 2 {
// Fast path - store small number of tsids in uncompressed form.
// This saves CPU time on compress / decompress.
buf := tagBufPool.Get()
buf.B = append(buf.B[:0], 0)
buf.B = marshalTSIDs(buf.B, tsids)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B))
db.tagFiltersCache.SetBig(key, buf.B)
qt.Printf("store %d tsids into cache", len(tsids))
tagBufPool.Put(buf)
return
}
// Slower path - store big number of tsids in compressed form.
// This increases cache capacity.
buf := tagBufPool.Get() buf := tagBufPool.Get()
buf.B = marshalTSIDs(buf.B[:0], tsids) buf.B = marshalMetricIDs(buf.B, metricIDs)
qt.Printf("marshaled %d tsids into %d bytes", len(tsids), len(buf.B)) qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
compressedBuf := tagBufPool.Get() db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
compressedBuf.B = append(compressedBuf.B[:0], 1) qt.Printf("stored %d metricIDs into cache", len(metricIDs))
compressedBuf.B = encoding.CompressZSTDLevel(compressedBuf.B, buf.B, 1)
qt.Printf("compressed %d tsids into %d bytes", len(tsids), len(compressedBuf.B))
tagBufPool.Put(buf) tagBufPool.Put(buf)
db.tagFiltersCache.SetBig(key, compressedBuf.B)
qt.Printf("stored %d compressed tsids into cache", len(tsids))
tagBufPool.Put(compressedBuf)
} }
func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error { func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
@ -488,35 +452,44 @@ func invalidateTagFiltersCache() {
var tagFiltersKeyGen uint64 var tagFiltersKeyGen uint64
func marshalTSIDs(dst []byte, tsids []TSID) []byte { func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
dst = encoding.MarshalUint64(dst, uint64(len(tsids))) dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
for i := range tsids { if len(metricIDs) == 0 {
dst = tsids[i].Marshal(dst) return dst
} }
var buf []byte
sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
sh.Cap = sh.Len
sh.Len = 8 * len(metricIDs)
dst = append(dst, buf...)
return dst return dst
} }
func unmarshalTSIDs(dst []TSID, src []byte) ([]TSID, error) { func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
if len(src)%8 != 0 {
return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
}
if len(src) < 8 { if len(src) < 8 {
return dst, fmt.Errorf("cannot unmarshal the number of tsids from %d bytes; require at least %d bytes", len(src), 8) return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
} }
n := encoding.UnmarshalUint64(src) n := encoding.UnmarshalUint64(src)
if n > ((1<<64)-1)/8 {
return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, ((1<<64)-1)/8)
}
src = src[8:] src = src[8:]
dstLen := len(dst) if n*8 != uint64(len(src)) {
if nn := dstLen + int(n) - cap(dst); nn > 0 { return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
dst = append(dst[:cap(dst)], make([]TSID, nn)...)
} }
dst = dst[:dstLen+int(n)] if n == 0 {
for i := 0; i < int(n); i++ { return dst, nil
tail, err := dst[dstLen+i].Unmarshal(src)
if err != nil {
return dst, fmt.Errorf("cannot unmarshal tsid #%d out of %d: %w", i, n, err)
}
src = tail
}
if len(src) > 0 {
return dst, fmt.Errorf("non-zero tail left after unmarshaling %d tsids; len(tail)=%d", n, len(src))
} }
var metricIDs []uint64
sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
sh.Data = uintptr(unsafe.Pointer(&src[0]))
sh.Cap = sh.Len
sh.Len = len(src) / 8
dst = append(dst, metricIDs...)
return dst, nil return dst, nil
} }
@ -1807,8 +1780,10 @@ func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
return dmis, nil return dmis, nil
} }
// searchTSIDs returns sorted tsids matching the given tfss over the given tr. func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) { qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
if len(tfss) == 0 { if len(tfss) == 0 {
return nil, nil return nil, nil
} }
@ -1816,33 +1791,32 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
tfss = convertToCompositeTagFilterss(tfss) tfss = convertToCompositeTagFilterss(tfss)
} }
qtChild := qt.NewChild("search for tsids in the current indexdb") qtChild := qt.NewChild("search for metricIDs in the current indexdb")
tfKeyBuf := tagFiltersKeyBufPool.Get() tfKeyBuf := tagFiltersKeyBufPool.Get()
defer tagFiltersKeyBufPool.Put(tfKeyBuf) defer tagFiltersKeyBufPool.Put(tfKeyBuf)
tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true) tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
tsids, ok := db.getFromTagFiltersCache(qtChild, tfKeyBuf.B) metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qtChild, tfKeyBuf.B)
if ok { if ok {
// Fast path - tsids found in the cache // Fast path - metricIDs found in the cache
qtChild.Done() qtChild.Done()
return tsids, nil return metricIDs, nil
} }
// Slow path - search for tsids in the db and extDB. // Slow path - search for metricIDs in the db and extDB.
accountID := tfss[0].accountID accountID := tfss[0].accountID
projectID := tfss[0].projectID projectID := tfss[0].projectID
is := db.getIndexSearch(accountID, projectID, deadline) is := db.getIndexSearch(accountID, projectID, deadline)
localTSIDs, err := is.searchTSIDs(qtChild, tfss, tr, maxMetrics) localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
db.putIndexSearch(is) db.putIndexSearch(is)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %s", err)
} }
qtChild.Done() qtChild.Done()
var extTSIDs []TSID var extMetricIDs []uint64
if db.doExtDB(func(extDB *indexDB) { if db.doExtDB(func(extDB *indexDB) {
qtChild := qt.NewChild("search for tsids in the previous indexdb") qtChild := qt.NewChild("search for metricIDs in the previous indexdb")
defer qtChild.Done() defer qtChild.Done()
tfKeyExtBuf := tagFiltersKeyBufPool.Get() tfKeyExtBuf := tagFiltersKeyBufPool.Get()
@ -1850,36 +1824,111 @@ func (db *indexDB) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr Ti
// Data in extDB cannot be changed, so use unversioned keys for tag cache. // Data in extDB cannot be changed, so use unversioned keys for tag cache.
tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false) tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
tsids, ok := extDB.getFromTagFiltersCache(qtChild, tfKeyExtBuf.B) metricIDs, ok := extDB.getMetricIDsFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
if ok { if ok {
extTSIDs = tsids extMetricIDs = metricIDs
return return
} }
is := extDB.getIndexSearch(accountID, projectID, deadline) is := extDB.getIndexSearch(accountID, projectID, deadline)
extTSIDs, err = is.searchTSIDs(qtChild, tfss, tr, maxMetrics) extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
extDB.putIndexSearch(is) extDB.putIndexSearch(is)
extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B)
sort.Slice(extTSIDs, func(i, j int) bool { return extTSIDs[i].Less(&extTSIDs[j]) })
extDB.putToTagFiltersCache(qtChild, extTSIDs, tfKeyExtBuf.B)
}) { }) {
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("error when searching for metricIDs in the previous indexdb: %s", err)
} }
} }
// Merge localTSIDs with extTSIDs. // Merge localMetricIDs with extMetricIDs.
tsids = mergeTSIDs(localTSIDs, extTSIDs) metricIDs = mergeSortedMetricIDs(localMetricIDs, extMetricIDs)
qt.Printf("merge %d tsids from the current indexdb with %d tsids from the previous indexdb; result: %d tsids", len(localTSIDs), len(extTSIDs), len(tsids)) qt.Printf("merge %d metricIDs from the current indexdb with %d metricIDs from the previous indexdb; result: %d metricIDs",
len(localMetricIDs), len(extMetricIDs), len(metricIDs))
// Store metricIDs in the cache.
db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)
return metricIDs, nil
}
func mergeSortedMetricIDs(a, b []uint64) []uint64 {
if len(b) == 0 {
return a
}
i := 0
j := 0
result := make([]uint64, 0, len(a)+len(b))
for {
next := b[j]
start := i
for i < len(a) && a[i] <= next {
i++
}
result = append(result, a[start:i]...)
if len(result) > 0 {
last := result[len(result)-1]
for j < len(b) && b[j] == last {
j++
}
}
if i == len(a) {
return append(result, b[j:]...)
}
a, b = b, a
i, j = j, i
}
}
func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, accountID, projectID uint32, metricIDs []uint64, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("obtain tsids from %d metricIDs", len(metricIDs))
defer qt.Done()
if len(metricIDs) == 0 {
return nil, nil
}
tsids := make([]TSID, len(metricIDs))
is := db.getIndexSearch(accountID, projectID, deadline)
defer db.putIndexSearch(is)
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metricIDs", len(tsids), len(metricIDs))
// Sort the found tsids, since they must be passed to TSID search // Sort the found tsids, since they must be passed to TSID search
// in the sorted order. // in the sorted order.
sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) }) sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
qt.Printf("sort %d tsids", len(tsids)) qt.Printf("sort %d tsids", len(tsids))
return tsids, nil
// Store TSIDs in the cache.
db.putToTagFiltersCache(qt, tsids, tfKeyBuf.B)
return tsids, err
} }
var tagFiltersKeyBufPool bytesutil.ByteBufferPool var tagFiltersKeyBufPool bytesutil.ByteBufferPool
@ -1954,30 +2003,6 @@ func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, er
return dst, nil return dst, nil
} }
func mergeTSIDs(a, b []TSID) []TSID {
if len(b) > len(a) {
a, b = b, a
}
if len(b) == 0 {
return a
}
m := make(map[uint64]TSID, len(a))
for i := range a {
tsid := &a[i]
m[tsid.MetricID] = *tsid
}
for i := range b {
tsid := &b[i]
m[tsid.MetricID] = *tsid
}
tsids := make([]TSID, 0, len(m))
for _, tsid := range m {
tsids = append(tsids, tsid)
}
return tsids
}
func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) { func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
ts := &is.ts ts := &is.ts
kb := &is.kb kb := &is.kb
@ -2006,66 +2031,6 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
return true, nil return true, nil
} }
func (is *indexSearch) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDs(qt, tfss, tr, maxMetrics)
if err != nil {
return nil, err
}
if len(metricIDs) == 0 {
// Nothing found.
return nil, nil
}
// Obtain TSID values for the given metricIDs.
tsids := make([]TSID, len(metricIDs))
i := 0
for loopsPaceLimiter, metricID := range metricIDs {
if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
return nil, err
}
}
// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
// than scanning the mergeset if it contains a lot of metricIDs.
tsid := &tsids[i]
err := is.db.getFromMetricIDCache(tsid, metricID)
if err == nil {
// Fast path - the tsid for metricID is found in cache.
i++
continue
}
if err != io.EOF {
return nil, err
}
if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
if err == io.EOF {
// Cannot find TSID for the given metricID.
// This may be the case on incomplete indexDB
// due to snapshot or due to unflushed entries.
// Just increment errors counter and skip it.
atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
continue
}
return nil, fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
}
is.db.putToMetricIDCache(metricID, tsid)
i++
}
tsids = tsids[:i]
qt.Printf("load %d tsids from %d metric ids", len(tsids), len(metricIDs))
// Do not sort the found tsids, since they will be sorted later.
return tsids, nil
}
func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error { func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
// There is no need in checking for deleted metricIDs here, since they // There is no need in checking for deleted metricIDs here, since they
// must be checked by the caller. // must be checked by the caller.
@ -2318,6 +2283,14 @@ func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer,
} }
func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) { func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
ok, err := is.containsTimeRange(tr)
if err != nil {
return nil, err
}
if !ok {
// Fast path - the index doesn't contain data for the given tr.
return nil, nil
}
metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics) metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -22,6 +22,77 @@ import (
"github.com/VictoriaMetrics/fastcache" "github.com/VictoriaMetrics/fastcache"
) )
func TestMarshalUnmarshalMetricIDs(t *testing.T) {
f := func(metricIDs []uint64) {
t.Helper()
data := marshalMetricIDs(nil, metricIDs)
result, err := unmarshalMetricIDs(nil, data)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if !reflect.DeepEqual(result, metricIDs) {
t.Fatalf("unexpected metricIDs after unmarshaling;\ngot\n%d\nwant\n%d", result, metricIDs)
}
}
f(nil)
f([]uint64{1})
f([]uint64{1234, 678932943, 843289893843})
}
func TestMergeSortedMetricIDs(t *testing.T) {
f := func(a, b []uint64) {
t.Helper()
m := make(map[uint64]bool)
var resultExpected []uint64
for _, v := range a {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
for _, v := range b {
if !m[v] {
m[v] = true
resultExpected = append(resultExpected, v)
}
}
sort.Slice(resultExpected, func(i, j int) bool {
return resultExpected[i] < resultExpected[j]
})
result := mergeSortedMetricIDs(a, b)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", a, b, result, resultExpected)
}
result = mergeSortedMetricIDs(b, a)
if !reflect.DeepEqual(result, resultExpected) {
t.Fatalf("unexpected result for mergeSortedMetricIDs(%d, %d); got\n%d\nwant\n%d", b, a, result, resultExpected)
}
}
f(nil, nil)
f([]uint64{1}, nil)
f(nil, []uint64{23})
f([]uint64{1234}, []uint64{0})
f([]uint64{1}, []uint64{1})
f([]uint64{1}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{1, 2, 3})
f([]uint64{1, 2, 3}, []uint64{2, 3})
f([]uint64{0, 1, 7, 8, 9, 13, 20}, []uint64{1, 2, 7, 13, 15})
f([]uint64{0, 1, 2, 3, 4}, []uint64{5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{4, 5, 6, 7, 8})
f([]uint64{0, 1, 2, 3, 4}, []uint64{3, 4, 5, 6, 7, 8})
f([]uint64{2, 3, 4}, []uint64{1, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 4, 5, 6, 7})
f([]uint64{2, 3, 4}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{1, 2, 3, 4, 5, 6, 7})
f([]uint64{1, 2, 3, 4, 6, 7, 8, 9}, []uint64{2, 3, 4, 5, 6, 7})
}
func TestReverseBytes(t *testing.T) { func TestReverseBytes(t *testing.T) {
f := func(s, resultExpected string) { f := func(s, resultExpected string) {
t.Helper() t.Helper()
@ -431,47 +502,6 @@ func TestRemoveDuplicateMetricIDs(t *testing.T) {
f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2}) f([]uint64{0, 1, 2, 2}, []uint64{0, 1, 2})
} }
func TestMarshalUnmarshalTSIDs(t *testing.T) {
f := func(tsids []TSID) {
t.Helper()
value := marshalTSIDs(nil, tsids)
tsidsGot, err := unmarshalTSIDs(nil, value)
if err != nil {
t.Fatalf("cannot unmarshal tsids: %s", err)
}
if len(tsids) == 0 && len(tsidsGot) != 0 || len(tsids) > 0 && !reflect.DeepEqual(tsids, tsidsGot) {
t.Fatalf("unexpected tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot, tsids)
}
// Try marshlaing with prefix
prefix := []byte("prefix")
valueExt := marshalTSIDs(prefix, tsids)
if !bytes.Equal(valueExt[:len(prefix)], prefix) {
t.Fatalf("unexpected prefix after marshaling;\ngot\n%X\nwant\n%X", valueExt[:len(prefix)], prefix)
}
if !bytes.Equal(valueExt[len(prefix):], value) {
t.Fatalf("unexpected prefixed marshaled value;\ngot\n%X\nwant\n%X", valueExt[len(prefix):], value)
}
// Try unmarshaling with prefix
tsidPrefix := []TSID{{MetricID: 123}, {JobID: 456}}
tsidsGot, err = unmarshalTSIDs(tsidPrefix, value)
if err != nil {
t.Fatalf("cannot unmarshal prefixed tsids: %s", err)
}
if !reflect.DeepEqual(tsidsGot[:len(tsidPrefix)], tsidPrefix) {
t.Fatalf("unexpected tsid prefix\ngot\n%+v\nwant\n%+v", tsidsGot[:len(tsidPrefix)], tsidPrefix)
}
if len(tsids) == 0 && len(tsidsGot) != len(tsidPrefix) || len(tsids) > 0 && !reflect.DeepEqual(tsidsGot[len(tsidPrefix):], tsids) {
t.Fatalf("unexpected prefixed tsids unmarshaled\ngot\n%+v\nwant\n%+v", tsidsGot[len(tsidPrefix):], tsids)
}
}
f(nil)
f([]TSID{{MetricID: 123}})
f([]TSID{{JobID: 34}, {MetricID: 2343}, {InstanceID: 243321}})
}
func TestIndexDBOpenClose(t *testing.T) { func TestIndexDBOpenClose(t *testing.T) {
s := newTestStorage() s := newTestStorage()
defer stopTestStorage(s) defer stopTestStorage(s)
@ -881,7 +911,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, false); err != nil { if err := tfs.Add(nil, nil, true, false); err != nil {
return fmt.Errorf("cannot add no-op negative filter: %w", err) return fmt.Errorf("cannot add no-op negative filter: %w", err)
} }
tsidsFound, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err) return fmt.Errorf("cannot search by exact tag filter: %w", err)
} }
@ -890,7 +920,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Verify tag cache. // Verify tag cache.
tsidsCached, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsCached, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter: %w", err) return fmt.Errorf("cannot search by exact tag filter: %w", err)
} }
@ -902,7 +932,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil { if err := tfs.Add(nil, mn.MetricGroup, true, false); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err) return fmt.Errorf("cannot search by exact tag filter with full negative: %w", err)
} }
@ -920,7 +950,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, []byte(re), false, true); err != nil { if err := tfs.Add(nil, []byte(re), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for Graphite wildcard") return fmt.Errorf("cannot create regexp tag filter for Graphite wildcard")
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err) return fmt.Errorf("cannot search by regexp tag filter for Graphite wildcard: %w", err)
} }
@ -937,7 +967,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag"), []byte("foo|"), false, true); err != nil { if err := tfs.Add([]byte("non-existent-tag"), []byte("foo|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag: %w", err) return fmt.Errorf("cannot create regexp tag filter for non-existing tag: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search with a filter matching empty tag: %w", err) return fmt.Errorf("cannot search with a filter matching empty tag: %w", err)
} }
@ -957,7 +987,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add([]byte("non-existent-tag2"), []byte("bar|"), false, true); err != nil { if err := tfs.Add([]byte("non-existent-tag2"), []byte("bar|"), false, true); err != nil {
return fmt.Errorf("cannot create regexp tag filter for non-existing tag2: %w", err) return fmt.Errorf("cannot create regexp tag filter for non-existing tag2: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search with multipel filters matching empty tags: %w", err) return fmt.Errorf("cannot search with multipel filters matching empty tags: %w", err)
} }
@ -985,7 +1015,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, true, true); err != nil { if err := tfs.Add(nil, nil, true, true); err != nil {
return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err) return fmt.Errorf("cannot add no-op negative filter with regexp: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter: %w", err) return fmt.Errorf("cannot search by regexp tag filter: %w", err)
} }
@ -995,7 +1025,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil { if err := tfs.Add(nil, mn.MetricGroup, true, true); err != nil {
return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err) return fmt.Errorf("cannot add negative filter for zeroing search results: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err) return fmt.Errorf("cannot search by regexp tag filter with full negative: %w", err)
} }
@ -1011,7 +1041,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil { if err := tfs.Add(nil, mn.MetricGroup, false, true); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err) return fmt.Errorf("cannot create tag filter for MetricGroup matching zero results: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search by non-existing tag filter: %w", err) return fmt.Errorf("cannot search by non-existing tag filter: %w", err)
} }
@ -1025,9 +1055,9 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
continue continue
} }
// Search with empty filter. It should match all the results for (accountID, projectID). // Search with empty filter. It should match all the results.
tfs.Reset(mn.AccountID, mn.ProjectID) tfs.Reset(mn.AccountID, mn.ProjectID)
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for common prefix: %w", err) return fmt.Errorf("cannot search for common prefix: %w", err)
} }
@ -1040,7 +1070,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs.Add(nil, nil, false, false); err != nil { if err := tfs.Add(nil, nil, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err) return fmt.Errorf("cannot create tag filter for empty metricGroup: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err) return fmt.Errorf("cannot search for empty metricGroup: %w", err)
} }
@ -1057,7 +1087,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil { if err := tfs2.Add(nil, mn.MetricGroup, false, false); err != nil {
return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err) return fmt.Errorf("cannot create tag filter for MetricGroup: %w", err)
} }
tsidsFound, err = db.searchTSIDs(nil, []*TagFilters{tfs1, tfs2}, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, []*TagFilters{tfs1, tfs2}, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for empty metricGroup: %w", err) return fmt.Errorf("cannot search for empty metricGroup: %w", err)
} }
@ -1066,7 +1096,7 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
} }
// Verify empty tfss // Verify empty tfss
tsidsFound, err = db.searchTSIDs(nil, nil, tr, 1e5, noDeadline) tsidsFound, err = searchTSIDsInTest(db, nil, tr)
if err != nil { if err != nil {
return fmt.Errorf("cannot search for nil tfss: %w", err) return fmt.Errorf("cannot search for nil tfss: %w", err)
} }
@ -1078,6 +1108,22 @@ func testIndexDBCheckTSIDByName(db *indexDB, mns []MetricName, tsids []TSID, isC
return nil return nil
} }
func searchTSIDsInTest(db *indexDB, tfss []*TagFilters, tr TimeRange) ([]TSID, error) {
metricIDs, err := db.searchMetricIDs(nil, tfss, tr, 1e5, noDeadline)
if err != nil {
return nil, err
}
if len(tfss) == 0 {
if len(metricIDs) > 0 {
return nil, fmt.Errorf("expecting empty metricIDs for non-empty tfss; got %d metricIDs", len(metricIDs))
}
return nil, nil
}
accountID := tfss[0].accountID
projectID := tfss[0].projectID
return db.getTSIDsFromMetricIDs(nil, accountID, projectID, metricIDs, noDeadline)
}
func testHasTSID(tsids []TSID, tsid *TSID) bool { func testHasTSID(tsids []TSID, tsid *TSID) bool {
for i := range tsids { for i := range tsids {
if tsids[i] == *tsid { if tsids[i] == *tsid {
@ -1831,7 +1877,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MinTimestamp: int64(now - 2*msecPerHour - 1), MinTimestamp: int64(now - 2*msecPerHour - 1),
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
matchedTSIDs, err := db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline) matchedTSIDs, err := searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
t.Fatalf("error searching tsids: %v", err) t.Fatalf("error searching tsids: %v", err)
} }
@ -1885,7 +1931,7 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
MaxTimestamp: int64(now), MaxTimestamp: int64(now),
} }
matchedTSIDs, err = db.searchTSIDs(nil, []*TagFilters{tfs}, tr, 10000, noDeadline) matchedTSIDs, err = searchTSIDsInTest(db, []*TagFilters{tfs}, tr)
if err != nil { if err != nil {
t.Fatalf("error searching tsids: %v", err) t.Fatalf("error searching tsids: %v", err)
} }

View file

@ -129,6 +129,7 @@ type Search struct {
// MetricBlockRef is updated with each Search.NextMetricBlock call. // MetricBlockRef is updated with each Search.NextMetricBlock call.
MetricBlockRef MetricBlockRef MetricBlockRef MetricBlockRef
// idb is used for MetricName lookup for the found data blocks.
idb *indexDB idb *indexDB
ts tableSearch ts tableSearch
@ -179,16 +180,23 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
} }
s.reset() s.reset()
s.idb = storage.idb()
s.tr = tr s.tr = tr
s.tfss = tfss s.tfss = tfss
s.deadline = deadline s.deadline = deadline
s.needClosing = true s.needClosing = true
tsids, err := storage.searchTSIDs(qt, tfss, tr, maxMetrics, deadline) var tsids []TSID
if err == nil { metricIDs, err := s.idb.searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
err = storage.prefetchMetricNames(qt, tsids, deadline) if err == nil && len(metricIDs) > 0 && len(tfss) > 0 {
accountID := tfss[0].accountID
projectID := tfss[0].projectID
tsids, err = s.idb.getTSIDsFromMetricIDs(qt, accountID, projectID, metricIDs, deadline)
if err == nil {
err = storage.prefetchMetricNames(qt, accountID, projectID, metricIDs, deadline)
}
} }
// It is ok to call Init on error from storage.searchTSIDs. // It is ok to call Init on non-nil err.
// Init must be called before returning because it will fail // Init must be called before returning because it will fail
// on Seach.MustClose otherwise. // on Seach.MustClose otherwise.
s.ts.Init(storage.tb, tsids, tr) s.ts.Init(storage.tb, tsids, tr)
@ -197,8 +205,6 @@ func (s *Search) Init(qt *querytracer.Tracer, storage *Storage, tfss []*TagFilte
s.err = err s.err = err
return 0 return 0
} }
s.idb = storage.idb()
return len(tsids) return len(tsids)
} }

View file

@ -50,9 +50,6 @@ type Storage struct {
addRowsConcurrencyLimitTimeout uint64 addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64 addRowsConcurrencyDroppedRows uint64
searchTSIDsConcurrencyLimitReached uint64
searchTSIDsConcurrencyLimitTimeout uint64
slowRowInserts uint64 slowRowInserts uint64
slowPerDayIndexInserts uint64 slowPerDayIndexInserts uint64
slowMetricNameLoads uint64 slowMetricNameLoads uint64
@ -474,11 +471,6 @@ type Metrics struct {
AddRowsConcurrencyCapacity uint64 AddRowsConcurrencyCapacity uint64
AddRowsConcurrencyCurrent uint64 AddRowsConcurrencyCurrent uint64
SearchTSIDsConcurrencyLimitReached uint64
SearchTSIDsConcurrencyLimitTimeout uint64
SearchTSIDsConcurrencyCapacity uint64
SearchTSIDsConcurrencyCurrent uint64
SearchDelays uint64 SearchDelays uint64
SlowRowInserts uint64 SlowRowInserts uint64
@ -556,11 +548,6 @@ func (s *Storage) UpdateMetrics(m *Metrics) {
m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh)) m.AddRowsConcurrencyCapacity = uint64(cap(addRowsConcurrencyCh))
m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh)) m.AddRowsConcurrencyCurrent = uint64(len(addRowsConcurrencyCh))
m.SearchTSIDsConcurrencyLimitReached += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitReached)
m.SearchTSIDsConcurrencyLimitTimeout += atomic.LoadUint64(&s.searchTSIDsConcurrencyLimitTimeout)
m.SearchTSIDsConcurrencyCapacity = uint64(cap(searchTSIDsConcurrencyCh))
m.SearchTSIDsConcurrencyCurrent = uint64(len(searchTSIDsConcurrencyCh))
m.SearchDelays = storagepacelimiter.Search.DelaysTotal() m.SearchDelays = storagepacelimiter.Search.DelaysTotal()
m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts) m.SlowRowInserts += atomic.LoadUint64(&s.slowRowInserts)
@ -1169,29 +1156,28 @@ func nextRetentionDuration(retentionMsecs int64) time.Duration {
func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) { func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]string, error) {
qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr) qt = qt.NewChild("search for matching metric names: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done() defer qt.Done()
tsids, err := s.searchTSIDs(qt, tfss, tr, maxMetrics, deadline) metricIDs, err := s.idb().searchMetricIDs(qt, tfss, tr, maxMetrics, deadline)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(tsids) == 0 { if len(metricIDs) == 0 || len(tfss) == 0 {
return nil, nil return nil, nil
} }
if err = s.prefetchMetricNames(qt, tsids, deadline); err != nil { accountID := tfss[0].accountID
projectID := tfss[0].projectID
if err = s.prefetchMetricNames(qt, accountID, projectID, metricIDs, deadline); err != nil {
return nil, err return nil, err
} }
accountID := tsids[0].AccountID
projectID := tsids[0].ProjectID
idb := s.idb() idb := s.idb()
metricNames := make([]string, 0, len(tsids)) metricNames := make([]string, 0, len(metricIDs))
metricNamesSeen := make(map[string]struct{}, len(tsids)) metricNamesSeen := make(map[string]struct{}, len(metricIDs))
var metricName []byte var metricName []byte
for i := range tsids { for i, metricID := range metricIDs {
if i&paceLimiterSlowIterationsMask == 0 { if i&paceLimiterSlowIterationsMask == 0 {
if err := checkSearchDeadlineAndPace(deadline); err != nil { if err := checkSearchDeadlineAndPace(deadline); err != nil {
return nil, err return nil, err
} }
} }
metricID := tsids[i].MetricID
var err error var err error
metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID, accountID, projectID) metricName, err = idb.searchMetricNameWithCache(metricName[:0], metricID, accountID, projectID)
if err != nil { if err != nil {
@ -1213,82 +1199,27 @@ func (s *Storage) SearchMetricNames(qt *querytracer.Tracer, tfss []*TagFilters,
return metricNames, nil return metricNames, nil
} }
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr. // prefetchMetricNames pre-fetches metric names for the given metricIDs into metricID->metricName cache.
func (s *Storage) searchTSIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]TSID, error) {
qt = qt.NewChild("search for matching tsids: filters=%s, timeRange=%s", tfss, &tr)
defer qt.Done()
// Do not cache tfss -> tsids here, since the caching is performed
// on idb level.
// Limit the number of concurrent goroutines that may search TSIDS in the storage.
// This should prevent from out of memory errors and CPU thrashing when too many
// goroutines call searchTSIDs.
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
default:
// Sleep for a while until giving up
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitReached, 1)
currentTime := fasttime.UnixTimestamp()
timeoutSecs := uint64(0)
if currentTime < deadline {
timeoutSecs = deadline - currentTime
}
timeout := time.Second * time.Duration(timeoutSecs)
t := timerpool.Get(timeout)
select {
case searchTSIDsConcurrencyCh <- struct{}{}:
qt.Printf("wait in the queue because %d concurrent search requests are already performed", cap(searchTSIDsConcurrencyCh))
timerpool.Put(t)
case <-t.C:
timerpool.Put(t)
atomic.AddUint64(&s.searchTSIDsConcurrencyLimitTimeout, 1)
return nil, fmt.Errorf("cannot search for tsids, since more than %d concurrent searches are performed during %.3f secs; add more CPUs or reduce query load",
cap(searchTSIDsConcurrencyCh), timeout.Seconds())
}
}
tsids, err := s.idb().searchTSIDs(qt, tfss, tr, maxMetrics, deadline)
<-searchTSIDsConcurrencyCh
if err != nil {
return nil, fmt.Errorf("error when searching tsids: %w", err)
}
return tsids, nil
}
var (
// Limit the concurrency for TSID searches to GOMAXPROCS*2, since this operation
// is CPU bound and sometimes disk IO bound, so there is no sense in running more
// than GOMAXPROCS*2 concurrent goroutines for TSID searches.
searchTSIDsConcurrencyCh = make(chan struct{}, cgroup.AvailableCPUs()*2)
)
// prefetchMetricNames pre-fetches metric names for the given tsids into metricID->metricName cache.
// //
// It is expected that all the tsdis have the same (accountID, projectID) // It is expected that all the metricIDs belong to the same (accountID, projectID)
// //
// This should speed-up further searchMetricNameWithCache calls for metricIDs from tsids. // This should speed-up further searchMetricNameWithCache calls for srcMetricIDs from tsids.
func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, tsids []TSID, deadline uint64) error { func (s *Storage) prefetchMetricNames(qt *querytracer.Tracer, accountID, projectID uint32, srcMetricIDs []uint64, deadline uint64) error {
qt = qt.NewChild("prefetch metric names for %d tsids", len(tsids)) qt = qt.NewChild("prefetch metric names for %d metricIDs", len(srcMetricIDs))
defer qt.Done() defer qt.Done()
if len(tsids) == 0 { if len(srcMetricIDs) == 0 {
qt.Printf("nothing to prefetch") qt.Printf("nothing to prefetch")
return nil return nil
} }
accountID := tsids[0].AccountID
projectID := tsids[0].ProjectID
var metricIDs uint64Sorter var metricIDs uint64Sorter
prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set) prefetchedMetricIDs := s.prefetchedMetricIDs.Load().(*uint64set.Set)
for i := range tsids { for _, metricID := range srcMetricIDs {
tsid := &tsids[i]
if tsid.AccountID != accountID || tsid.ProjectID != projectID {
logger.Panicf("BUG: unexpected (accountID, projectID) in tsid=%#v; want accountID=%d, projectID=%d", tsid, accountID, projectID)
}
metricID := tsid.MetricID
if prefetchedMetricIDs.Has(metricID) { if prefetchedMetricIDs.Has(metricID) {
continue continue
} }
metricIDs = append(metricIDs, metricID) metricIDs = append(metricIDs, metricID)
} }
qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(tsids)) qt.Printf("%d out of %d metric names must be pre-fetched", len(metricIDs), len(srcMetricIDs))
if len(metricIDs) < 500 { if len(metricIDs) < 500 {
// It is cheaper to skip pre-fetching and obtain metricNames inline. // It is cheaper to skip pre-fetching and obtain metricNames inline.
qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs)) qt.Printf("skip pre-fetching metric names for low number of metrid ids=%d", len(metricIDs))