mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/storage: share tsids across all the partSearch instances
This should reduce memory usage when big number of time series matches the given query.
This commit is contained in:
parent
4e26ad869b
commit
c9063ece66
5 changed files with 22 additions and 6 deletions
|
@ -925,7 +925,7 @@ func (is *indexSearch) loadDeletedMetricIDs() (map[uint64]struct{}, error) {
|
||||||
return dmis, nil
|
return dmis, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchTSIDs returns tsids matching the given tfss over the given tr.
|
// searchTSIDs returns sorted tsids matching the given tfss over the given tr.
|
||||||
func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
func (db *indexDB) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
||||||
if len(tfss) == 0 {
|
if len(tfss) == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|
|
@ -3,7 +3,9 @@ package storage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"os"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||||
|
@ -49,7 +51,7 @@ type partSearch struct {
|
||||||
func (ps *partSearch) reset() {
|
func (ps *partSearch) reset() {
|
||||||
ps.Block.Reset()
|
ps.Block.Reset()
|
||||||
ps.p = nil
|
ps.p = nil
|
||||||
ps.tsids = ps.tsids[:0]
|
ps.tsids = nil
|
||||||
ps.tsidIdx = 0
|
ps.tsidIdx = 0
|
||||||
ps.fetchData = true
|
ps.fetchData = true
|
||||||
ps.metaindex = nil
|
ps.metaindex = nil
|
||||||
|
@ -64,16 +66,24 @@ func (ps *partSearch) reset() {
|
||||||
ps.err = nil
|
ps.err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var isInTest = func() bool {
|
||||||
|
return strings.HasSuffix(os.Args[0], ".test")
|
||||||
|
}()
|
||||||
|
|
||||||
// Init initializes the ps with the given p, tsids and tr.
|
// Init initializes the ps with the given p, tsids and tr.
|
||||||
|
//
|
||||||
|
// tsids must be sorted.
|
||||||
|
// tsids cannot be modified after the Init call, since it is owned by ps.
|
||||||
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (ps *partSearch) Init(p *part, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||||
ps.reset()
|
ps.reset()
|
||||||
ps.p = p
|
ps.p = p
|
||||||
|
|
||||||
if p.ph.MinTimestamp <= tr.MaxTimestamp && p.ph.MaxTimestamp >= tr.MinTimestamp {
|
if p.ph.MinTimestamp <= tr.MaxTimestamp && p.ph.MaxTimestamp >= tr.MinTimestamp {
|
||||||
if !sort.SliceIsSorted(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) }) {
|
if isInTest && !sort.SliceIsSorted(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) }) {
|
||||||
logger.Panicf("BUG: tsids must be sorted; got %+v", tsids)
|
logger.Panicf("BUG: tsids must be sorted; got %+v", tsids)
|
||||||
}
|
}
|
||||||
ps.tsids = append(ps.tsids[:0], tsids...)
|
// take ownership of of tsids.
|
||||||
|
ps.tsids = tsids
|
||||||
}
|
}
|
||||||
ps.tr = tr
|
ps.tr = tr
|
||||||
ps.fetchData = fetchData
|
ps.fetchData = fetchData
|
||||||
|
|
|
@ -55,7 +55,10 @@ func (pts *partitionSearch) reset() {
|
||||||
|
|
||||||
// Init initializes the search in the given partition for the given tsid and tr.
|
// Init initializes the search in the given partition for the given tsid and tr.
|
||||||
//
|
//
|
||||||
// MustClose must be called when partition search is done.
|
// tsids must be sorted.
|
||||||
|
// tsids cannot be modified after the Init call, since it is owned by pts.
|
||||||
|
//
|
||||||
|
/// MustClose must be called when partition search is done.
|
||||||
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (pts *partitionSearch) Init(pt *partition, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||||
if pts.needClosing {
|
if pts.needClosing {
|
||||||
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
logger.Panicf("BUG: missing partitionSearch.MustClose call before the next call to Init")
|
||||||
|
|
|
@ -579,7 +579,7 @@ func nextRetentionDuration(retentionMonths int) time.Duration {
|
||||||
return deadline.Sub(t)
|
return deadline.Sub(t)
|
||||||
}
|
}
|
||||||
|
|
||||||
// searchTSIDs returns TSIDs for the given tfss and the given tr.
|
// searchTSIDs returns sorted TSIDs for the given tfss and the given tr.
|
||||||
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
func (s *Storage) searchTSIDs(tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]TSID, error) {
|
||||||
// Do not cache tfss -> tsids here, since the caching is performed
|
// Do not cache tfss -> tsids here, since the caching is performed
|
||||||
// on idb level.
|
// on idb level.
|
||||||
|
|
|
@ -54,6 +54,9 @@ func (ts *tableSearch) reset() {
|
||||||
|
|
||||||
// Init initializes the ts.
|
// Init initializes the ts.
|
||||||
//
|
//
|
||||||
|
// tsids must be sorted.
|
||||||
|
// tsids cannot be modified after the Init call, since it is owned by ts.
|
||||||
|
//
|
||||||
// MustClose must be called then the tableSearch is done.
|
// MustClose must be called then the tableSearch is done.
|
||||||
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
|
func (ts *tableSearch) Init(tb *table, tsids []TSID, tr TimeRange, fetchData bool) {
|
||||||
if ts.needClosing {
|
if ts.needClosing {
|
||||||
|
|
Loading…
Reference in a new issue