package storage

import (


const (
	// Prefix for MetricName->TSID entries.
	nsPrefixMetricNameToTSID = 0

	// Prefix for Tag->MetricID entries.
	nsPrefixTagToMetricIDs = 1

	// Prefix for MetricID->TSID entries.
	nsPrefixMetricIDToTSID = 2

	// Prefix for MetricID->MetricName entries.
	nsPrefixMetricIDToMetricName = 3

	// Prefix for deleted MetricID entries.
	nsPrefixDeletedMetricID = 4

	// Prefix for Date->MetricID entries.
	nsPrefixDateToMetricID = 5

	// Prefix for (Date,Tag)->MetricID entries.
	nsPrefixDateTagToMetricIDs = 6

// indexDB represents an index db.
type indexDB struct {
	// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
	// See .

	refCount uint64

	// The counter for newly created time series. It can be used for determining time series churn rate.
	newTimeseriesCreated uint64

	// The counter for time series which were re-populated from previous indexDB after the rotation.
	timeseriesRepopulated uint64

	// The number of missing MetricID -> TSID entries.
	// High rate for this value means corrupted indexDB.
	missingTSIDsForMetricID uint64

	// The number of calls for date range searches.
	dateRangeSearchCalls uint64

	// The number of hits for date range searches.
	dateRangeSearchHits uint64

	// The number of calls for global search.
	globalSearchCalls uint64

	// missingMetricNamesForMetricID is a counter of missing MetricID -> MetricName entries.
	// High rate may mean corrupted indexDB due to unclean shutdown.
	// The db must be automatically recovered after that.
	missingMetricNamesForMetricID uint64

	mustDrop uint64

	// generation identifies the index generation ID
	// and is used for syncing items from different indexDBs
	generation uint64

	// The unix timestamp in seconds for the indexDB rotation.
	rotationTimestamp uint64

	name string
	tb   *mergeset.Table

	extDB     *indexDB
	extDBLock sync.Mutex

	// Cache for fast TagFilters -> MetricIDs lookup.
	tagFiltersToMetricIDsCache *workingsetcache.Cache

	// The parent storage.
	s *Storage

	// Cache for (date, tagFilter) -> loopsCount, which is used for reducing
	// the amount of work when matching a set of filters.
	loopsPerDateTagFilterCache *workingsetcache.Cache

	indexSearchPool sync.Pool

var maxTagFiltersCacheSize int

// SetTagFiltersCacheSize overrides the default size of tagFiltersToMetricIDsCache
func SetTagFiltersCacheSize(size int) {
	maxTagFiltersCacheSize = size

func getTagFiltersCacheSize() int {
	if maxTagFiltersCacheSize <= 0 {
		return int(float64(memory.Allowed()) / 32)
	return maxTagFiltersCacheSize

// openIndexDB opens index db from the given path.
// The last segment of the path should contain unique hex value which
// will be then used as indexDB.generation
// The rotationTimestamp must be set to the current unix timestamp when openIndexDB
// is called when creating new indexdb during indexdb rotation.
func openIndexDB(path string, s *Storage, rotationTimestamp uint64, isReadOnly *uint32) (*indexDB, error) {
	if s == nil {
		logger.Panicf("BUG: Storage must be nin-nil")

	name := filepath.Base(path)
	gen, err := strconv.ParseUint(name, 16, 64)
	if err != nil {
		return nil, fmt.Errorf("failed to parse indexdb path %q: %w", path, err)

	tb, err := mergeset.OpenTable(path, invalidateTagFiltersCache, mergeTagToMetricIDsRows, isReadOnly)
	if err != nil {
		return nil, fmt.Errorf("cannot open indexDB %q: %w", path, err)

	// Do not persist tagFiltersToMetricIDsCache in files, since it is very volatile.
	mem := memory.Allowed()
	tagFiltersCacheSize := getTagFiltersCacheSize()

	db := &indexDB{
		refCount:          1,
		generation:        gen,
		rotationTimestamp: rotationTimestamp,
		tb:                tb,
		name:              name,

		tagFiltersToMetricIDsCache: workingsetcache.New(tagFiltersCacheSize),
		s:                          s,
		loopsPerDateTagFilterCache: workingsetcache.New(mem / 128),
	return db, nil

const noDeadline = 1<<64 - 1

// IndexDBMetrics contains essential metrics for indexDB.
type IndexDBMetrics struct {
	TagFiltersToMetricIDsCacheSize         uint64
	TagFiltersToMetricIDsCacheSizeBytes    uint64
	TagFiltersToMetricIDsCacheSizeMaxBytes uint64
	TagFiltersToMetricIDsCacheRequests     uint64
	TagFiltersToMetricIDsCacheMisses       uint64

	DeletedMetricsCount uint64

	IndexDBRefCount uint64

	NewTimeseriesCreated    uint64
	TimeseriesRepopulated   uint64
	MissingTSIDsForMetricID uint64

	RecentHourMetricIDsSearchCalls uint64
	RecentHourMetricIDsSearchHits  uint64

	DateRangeSearchCalls uint64
	DateRangeSearchHits  uint64
	GlobalSearchCalls    uint64

	MissingMetricNamesForMetricID uint64

	IndexBlocksWithMetricIDsProcessed      uint64
	IndexBlocksWithMetricIDsIncorrectOrder uint64

	MinTimestampForCompositeIndex     uint64
	CompositeFilterSuccessConversions uint64
	CompositeFilterMissingConversions uint64


func (db *indexDB) scheduleToDrop() {
	atomic.AddUint64(&db.mustDrop, 1)

// UpdateMetrics updates m with metrics from the db.
func (db *indexDB) UpdateMetrics(m *IndexDBMetrics) {
	var cs fastcache.Stats

	m.TagFiltersToMetricIDsCacheSize += cs.EntriesCount
	m.TagFiltersToMetricIDsCacheSizeBytes += cs.BytesSize
	m.TagFiltersToMetricIDsCacheSizeMaxBytes += cs.MaxBytesSize
	m.TagFiltersToMetricIDsCacheRequests += cs.GetCalls
	m.TagFiltersToMetricIDsCacheMisses += cs.Misses

	m.DeletedMetricsCount += uint64(db.s.getDeletedMetricIDs().Len())

	m.IndexDBRefCount += atomic.LoadUint64(&db.refCount)
	m.NewTimeseriesCreated += atomic.LoadUint64(&db.newTimeseriesCreated)
	m.TimeseriesRepopulated += atomic.LoadUint64(&db.timeseriesRepopulated)
	m.MissingTSIDsForMetricID += atomic.LoadUint64(&db.missingTSIDsForMetricID)

	m.DateRangeSearchCalls += atomic.LoadUint64(&db.dateRangeSearchCalls)
	m.DateRangeSearchHits += atomic.LoadUint64(&db.dateRangeSearchHits)
	m.GlobalSearchCalls += atomic.LoadUint64(&db.globalSearchCalls)

	m.MissingMetricNamesForMetricID += atomic.LoadUint64(&db.missingMetricNamesForMetricID)

	m.IndexBlocksWithMetricIDsProcessed = atomic.LoadUint64(&indexBlocksWithMetricIDsProcessed)
	m.IndexBlocksWithMetricIDsIncorrectOrder = atomic.LoadUint64(&indexBlocksWithMetricIDsIncorrectOrder)

	m.MinTimestampForCompositeIndex = uint64(db.s.minTimestampForCompositeIndex)
	m.CompositeFilterSuccessConversions = atomic.LoadUint64(&compositeFilterSuccessConversions)
	m.CompositeFilterMissingConversions = atomic.LoadUint64(&compositeFilterMissingConversions)

	db.doExtDB(func(extDB *indexDB) {
		m.IndexDBRefCount += atomic.LoadUint64(&extDB.refCount)

func (db *indexDB) doExtDB(f func(extDB *indexDB)) bool {
	extDB := db.extDB
	if extDB != nil {
	if extDB == nil {
		return false
	return true

// SetExtDB sets external db to search.
// It decrements refCount for the previous extDB.
func (db *indexDB) SetExtDB(extDB *indexDB) {
	prevExtDB := db.extDB
	db.extDB = extDB

	if prevExtDB != nil {

// MustClose closes db.
func (db *indexDB) MustClose() {

func (db *indexDB) incRef() {
	atomic.AddUint64(&db.refCount, 1)

func (db *indexDB) decRef() {
	n := atomic.AddUint64(&db.refCount, ^uint64(0))
	if int64(n) < 0 {
		logger.Panicf("BUG: negative refCount: %d", n)
	if n > 0 {

	tbPath := db.tb.Path()

	// Free space occupied by caches owned by db.

	db.tagFiltersToMetricIDsCache = nil
	db.s = nil
	db.loopsPerDateTagFilterCache = nil

	if atomic.LoadUint64(&db.mustDrop) == 0 {

	logger.Infof("dropping indexDB %q", tbPath)
	logger.Infof("indexDB %q has been dropped", tbPath)

var tagBufPool bytesutil.ByteBufferPool

func (db *indexDB) getMetricIDsFromTagFiltersCache(qt *querytracer.Tracer, key []byte) ([]uint64, bool) {
	qt = qt.NewChild("search for metricIDs in tag filters cache")
	defer qt.Done()
	buf := tagBufPool.Get()
	defer tagBufPool.Put(buf)
	buf.B = db.tagFiltersToMetricIDsCache.GetBig(buf.B[:0], key)
	if len(buf.B) == 0 {
		qt.Printf("cache miss")
		return nil, false
	qt.Printf("found metricIDs with size: %d bytes", len(buf.B))
	metricIDs, err := unmarshalMetricIDs(nil, buf.B)
	if err != nil {
		logger.Panicf("FATAL: cannot unmarshal metricIDs from tagFiltersToMetricIDsCache: %s", err)
	qt.Printf("unmarshaled %d metricIDs", len(metricIDs))
	return metricIDs, true

func (db *indexDB) putMetricIDsToTagFiltersCache(qt *querytracer.Tracer, metricIDs []uint64, key []byte) {
	qt = qt.NewChild("put %d metricIDs in cache", len(metricIDs))
	defer qt.Done()
	buf := tagBufPool.Get()
	buf.B = marshalMetricIDs(buf.B, metricIDs)
	qt.Printf("marshaled %d metricIDs into %d bytes", len(metricIDs), len(buf.B))
	db.tagFiltersToMetricIDsCache.SetBig(key, buf.B)
	qt.Printf("stored %d metricIDs into cache", len(metricIDs))

func (db *indexDB) getFromMetricIDCache(dst *TSID, metricID uint64) error {
	// There is no need in checking for deleted metricIDs here, since they
	// must be checked by the caller.
	buf := (*[unsafe.Sizeof(*dst)]byte)(unsafe.Pointer(dst))
	key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
	tmp := db.s.metricIDCache.Get(buf[:0], key[:])
	if len(tmp) == 0 {
		// The TSID for the given metricID wasn't found in the cache.
		return io.EOF
	if &tmp[0] != &buf[0] || len(tmp) != len(buf) {
		return fmt.Errorf("corrupted MetricID->TSID cache: unexpected size for metricID=%d value; got %d bytes; want %d bytes", metricID, len(tmp), len(buf))
	return nil

func (db *indexDB) putToMetricIDCache(metricID uint64, tsid *TSID) {
	buf := (*[unsafe.Sizeof(*tsid)]byte)(unsafe.Pointer(tsid))
	key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
	db.s.metricIDCache.Set(key[:], buf[:])

func (db *indexDB) getMetricNameFromCache(dst []byte, metricID uint64) []byte {
	// There is no need in checking for deleted metricIDs here, since they
	// must be checked by the caller.
	key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
	return db.s.metricNameCache.Get(dst, key[:])

func (db *indexDB) putMetricNameToCache(metricID uint64, metricName []byte) {
	key := (*[unsafe.Sizeof(metricID)]byte)(unsafe.Pointer(&metricID))
	db.s.metricNameCache.Set(key[:], metricName)

// maybeCreateIndexes probabilistically creates global and per-day indexes for the given (tsid, metricNameRaw, date) at db.
// The probability increases from 0 to 100% during the first hour since db rotation.
// It returns true if new index entry was created, and false if it was skipped.
func (is *indexSearch) maybeCreateIndexes(tsid *TSID, metricNameRaw []byte, date uint64) (bool, error) {
	pMin := float64(fasttime.UnixTimestamp()-is.db.rotationTimestamp) / 3600
	if pMin < 1 {
		p := float64(uint32(fastHashUint64(tsid.MetricID))) / (1 << 32)
		if p > pMin {
			// Fast path: there is no need creating indexes for metricNameRaw yet.
			return false, nil
	// Slow path: create indexes for (tsid, metricNameRaw) at db.
	mn := GetMetricName()
	if err := mn.UnmarshalRaw(metricNameRaw); err != nil {
		return false, fmt.Errorf("cannot unmarshal metricNameRaw %q: %w", metricNameRaw, err)
	is.createGlobalIndexes(tsid, mn)
	is.createPerDayIndexes(date, tsid.MetricID, mn)
	atomic.AddUint64(&is.db.timeseriesRepopulated, 1)
	return true, nil

func marshalTagFiltersKey(dst []byte, tfss []*TagFilters, tr TimeRange, versioned bool) []byte {
	prefix := ^uint64(0)
	if versioned {
		prefix = atomic.LoadUint64(&tagFiltersKeyGen)
	// Round start and end times to per-day granularity according to per-day inverted index.
	startDate := uint64(tr.MinTimestamp) / msecPerDay
	endDate := uint64(tr.MaxTimestamp-1) / msecPerDay
	dst = encoding.MarshalUint64(dst, prefix)
	dst = encoding.MarshalUint64(dst, startDate)
	dst = encoding.MarshalUint64(dst, endDate)
	for _, tfs := range tfss {
		dst = append(dst, 0) // separator between tfs groups.
		for i := range tfs.tfs {
			dst = tfs.tfs[i].Marshal(dst)
	return dst

func invalidateTagFiltersCache() {
	// This function must be fast, since it is called each
	// time new timeseries is added.
	atomic.AddUint64(&tagFiltersKeyGen, 1)

var tagFiltersKeyGen uint64

func marshalMetricIDs(dst []byte, metricIDs []uint64) []byte {
	dst = encoding.MarshalUint64(dst, uint64(len(metricIDs)))
	if len(metricIDs) == 0 {
		return dst
	var buf []byte
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&buf))
	sh.Data = uintptr(unsafe.Pointer(&metricIDs[0]))
	sh.Cap = 8 * len(metricIDs)
	sh.Len = sh.Cap
	dst = append(dst, buf...)
	return dst

func unmarshalMetricIDs(dst []uint64, src []byte) ([]uint64, error) {
	if len(src)%8 != 0 {
		return dst, fmt.Errorf("cannot unmarshal metricIDs from buffer of %d bytes; the buffer length must divide by 8", len(src))
	if len(src) < 8 {
		return dst, fmt.Errorf("cannot unmarshal metricIDs len from buffer of %d bytes; need at least 8 bytes", len(src))
	n := encoding.UnmarshalUint64(src)
	if n > ((1<<64)-1)/8 {
		return dst, fmt.Errorf("unexpectedly high metricIDs len: %d bytes; must be lower than %d bytes", n, uint64(((1<<64)-1)/8))
	src = src[8:]
	if n*8 != uint64(len(src)) {
		return dst, fmt.Errorf("unexpected buffer length for unmarshaling metricIDs; got %d bytes; want %d bytes", n*8, len(src))
	if n == 0 {
		return dst, nil
	var metricIDs []uint64
	sh := (*reflect.SliceHeader)(unsafe.Pointer(&metricIDs))
	sh.Data = uintptr(unsafe.Pointer(&src[0]))
	sh.Cap = sh.Len
	sh.Len = len(src) / 8
	dst = append(dst, metricIDs...)
	return dst, nil

// getTSIDByNameNoCreate fills the dst with TSID for the given metricName.
// It returns io.EOF if the given mn isn't found locally.
func (db *indexDB) getTSIDByNameNoCreate(dst *TSID, metricName []byte) error {
	is := db.getIndexSearch(noDeadline)
	err := is.getTSIDByMetricName(dst, metricName)
	if err == nil {
		return nil
	if err != io.EOF {
		return fmt.Errorf("cannot search TSID by MetricName %q: %w", metricName, err)

	// Do not search for the TSID in the external storage,
	// since this function is already called by another indexDB instance.

	// The TSID for the given mn wasn't found.
	return io.EOF

type indexSearch struct {
	db *indexDB
	ts mergeset.TableSearch
	kb bytesutil.ByteBuffer
	mp tagToMetricIDsRowParser

	// deadline in unix timestamp seconds for the given search.
	deadline uint64

	// tsidByNameMisses and tsidByNameSkips is used for a performance
	// hack in GetOrCreateTSIDByName. See the comment there.
	tsidByNameMisses int
	tsidByNameSkips  int

// GetOrCreateTSIDByName fills the dst with TSID for the given metricName.
// It also registers the metricName in global and per-day indexes
// for the given date if the metricName->TSID entry is missing in the index.
func (is *indexSearch) GetOrCreateTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
	// A hack: skip searching for the TSID after many serial misses.
	// This should improve insertion performance for big batches
	// of new time series.
	if is.tsidByNameMisses < 100 {
		err := is.getTSIDByMetricName(dst, metricName)
		if err == nil {
			// Fast path - the TSID for the given metricName has been found in the index.
			is.tsidByNameMisses = 0
			if err = is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
				return err
			// There is no need in checking whether the TSID is present in the per-day index for the given date,
			// since this check must be performed by the caller in an optimized way.
			// See storage.updatePerDateData() function.
			return nil
		if err != io.EOF {
			userReadableMetricName := getUserReadableMetricName(metricNameRaw)
			return fmt.Errorf("cannot search TSID by MetricName %s: %w", userReadableMetricName, err)
	} else {
		if is.tsidByNameSkips > 10000 {
			is.tsidByNameSkips = 0
			is.tsidByNameMisses = 0

	// TSID for the given name wasn't found. Create it.
	// It is OK if duplicate TSID for mn is created by concurrent goroutines.
	// Metric results will be merged by mn after TableSearch.
	if err := is.createTSIDByName(dst, metricName, metricNameRaw, date); err != nil {
		userReadableMetricName := getUserReadableMetricName(metricNameRaw)
		return fmt.Errorf("cannot create TSID by MetricName %s: %w", userReadableMetricName, err)
	return nil

func (db *indexDB) getIndexSearch(deadline uint64) *indexSearch {
	v := db.indexSearchPool.Get()
	if v == nil {
		v = &indexSearch{
			db: db,
	is := v.(*indexSearch)
	is.deadline = deadline
	return is

func (db *indexDB) putIndexSearch(is *indexSearch) {
	is.deadline = 0

	// Do not reset tsidByNameMisses and tsidByNameSkips,
	// since they are used in GetOrCreateTSIDByName across call boundaries.


func (is *indexSearch) createTSIDByName(dst *TSID, metricName, metricNameRaw []byte, date uint64) error {
	mn := GetMetricName()
	defer PutMetricName(mn)
	if err := mn.Unmarshal(metricName); err != nil {
		return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName, err)

	created, err := is.db.getOrCreateTSID(dst, metricName, mn)
	if err != nil {
		return fmt.Errorf("cannot generate TSID: %w", err)
	if err := is.db.s.registerSeriesCardinality(dst.MetricID, metricNameRaw); err != nil {
		return err
	is.createGlobalIndexes(dst, mn)
	is.createPerDayIndexes(date, dst.MetricID, mn)

	// There is no need in invalidating tag cache, since it is invalidated
	// on db.tb flush via invalidateTagFiltersCache flushCallback passed to OpenTable.

	if created {
		// Increase the newTimeseriesCreated counter only if tsid wasn't found in indexDB
		atomic.AddUint64(&is.db.newTimeseriesCreated, 1)
		if logNewSeries {
			logger.Infof("new series created: %s", mn.String())
	return nil

// SetLogNewSeries updates new series logging.
// This function must be called before any calling any storage functions.
func SetLogNewSeries(ok bool) {
	logNewSeries = ok

var logNewSeries = false

// getOrCreateTSID looks for existing TSID for the given metricName in db.extDB or creates a new TSID if nothing was found.
// Returns true if TSID was created or false if TSID was in extDB
func (db *indexDB) getOrCreateTSID(dst *TSID, metricName []byte, mn *MetricName) (bool, error) {
	// Search the TSID in the external storage.
	// This is usually the db from the previous period.
	var err error
	if db.doExtDB(func(extDB *indexDB) {
		err = extDB.getTSIDByNameNoCreate(dst, metricName)
	}) {
		if err == nil {
			// The TSID has been found in the external storage.
			return false, nil
		if err != io.EOF {
			return false, fmt.Errorf("external search failed: %w", err)
	// The TSID wasn't found in the external storage.
	// Generate it locally.
	generateTSID(dst, mn)
	return true, nil

func generateTSID(dst *TSID, mn *MetricName) {
	dst.MetricGroupID = xxhash.Sum64(mn.MetricGroup)
	// Assume that the job-like metric is put at mn.Tags[0], while instance-like metric is put at mn.Tags[1]
	// This assumption is true because mn.Tags must be sorted with mn.sortTags() before calling generateTSID() function.
	// This allows grouping data blocks for the same (job, instance) close to each other on disk.
	// This reduces disk seeks and disk read IO when data blocks are read from disk for the same job and/or instance.
	// For example, data blocks for time series matching `process_resident_memory_bytes{job="vmstorage"}` are physically adjacent on disk.
	if len(mn.Tags) > 0 {
		dst.JobID = uint32(xxhash.Sum64(mn.Tags[0].Value))
	if len(mn.Tags) > 1 {
		dst.InstanceID = uint32(xxhash.Sum64(mn.Tags[1].Value))
	dst.MetricID = generateUniqueMetricID()

func (is *indexSearch) createGlobalIndexes(tsid *TSID, mn *MetricName) {
	// The order of index items is important.
	// It guarantees index consistency.

	ii := getIndexItems()
	defer putIndexItems(ii)

	// Create MetricName -> TSID index.
	ii.B = append(ii.B, nsPrefixMetricNameToTSID)
	ii.B = mn.Marshal(ii.B)
	ii.B = append(ii.B, kvSeparatorChar)
	ii.B = tsid.Marshal(ii.B)

	// Create MetricID -> MetricName index.
	ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToMetricName)
	ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
	ii.B = mn.Marshal(ii.B)

	// Create MetricID -> TSID index.
	ii.B = marshalCommonPrefix(ii.B, nsPrefixMetricIDToTSID)
	ii.B = encoding.MarshalUint64(ii.B, tsid.MetricID)
	ii.B = tsid.Marshal(ii.B)

	prefix := kbPool.Get()
	prefix.B = marshalCommonPrefix(prefix.B[:0], nsPrefixTagToMetricIDs)
	ii.registerTagIndexes(prefix.B, mn, tsid.MetricID)


type indexItems struct {
	B     []byte
	Items [][]byte

	start int

func (ii *indexItems) reset() {
	ii.B = ii.B[:0]
	ii.Items = ii.Items[:0]
	ii.start = 0

func (ii *indexItems) Next() {
	ii.Items = append(ii.Items, ii.B[ii.start:])
	ii.start = len(ii.B)

func getIndexItems() *indexItems {
	v := indexItemsPool.Get()
	if v == nil {
		return &indexItems{}
	return v.(*indexItems)

func putIndexItems(ii *indexItems) {

var indexItemsPool sync.Pool

// SearchLabelNamesWithFiltersOnTimeRange returns all the label names, which match the given tfss on the given tr.
func (db *indexDB) SearchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int, deadline uint64) ([]string, error) {
	qt = qt.NewChild("search for label names: filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", tfss, &tr, maxLabelNames, maxMetrics)
	defer qt.Done()
	lns := make(map[string]struct{})
	qtChild := qt.NewChild("search for label names in the current indexdb")
	is := db.getIndexSearch(deadline)
	err := is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics)
	qtChild.Donef("found %d label names", len(lns))
	if err != nil {
		return nil, err

	db.doExtDB(func(extDB *indexDB) {
		qtChild := qt.NewChild("search for label names in the previous indexdb")
		lnsLen := len(lns)
		is := extDB.getIndexSearch(deadline)
		err = is.searchLabelNamesWithFiltersOnTimeRange(qtChild, lns, tfss, tr, maxLabelNames, maxMetrics)
		qtChild.Donef("found %d additional label names", len(lns)-lnsLen)
	if err != nil {
		return nil, err

	labelNames := make([]string, 0, len(lns))
	for labelName := range lns {
		labelNames = append(labelNames, labelName)
	// Do not sort label names, since they must be sorted by vmselect.
	qt.Printf("found %d label names in the current and the previous indexdb", len(labelNames))
	return labelNames, nil

func (is *indexSearch) searchLabelNamesWithFiltersOnTimeRange(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, tr TimeRange, maxLabelNames, maxMetrics int) error {
	minDate := uint64(tr.MinTimestamp) / msecPerDay
	maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
	if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
		qtChild := qt.NewChild("search for label names in global index: filters=%s", tfss)
		err := is.searchLabelNamesWithFiltersOnDate(qtChild, lns, tfss, 0, maxLabelNames, maxMetrics)
		return err
	var mu sync.Mutex
	wg := getWaitGroup()
	var errGlobal error
	qt = qt.NewChild("parallel search for label names: filters=%s, timeRange=%s", tfss, &tr)
	for date := minDate; date <= maxDate; date++ {
		qtChild := qt.NewChild("search for label names: filters=%s, date=%s", tfss, dateToString(date))
		go func(date uint64) {
			defer func() {
			lnsLocal := make(map[string]struct{})
			isLocal := is.db.getIndexSearch(is.deadline)
			err := isLocal.searchLabelNamesWithFiltersOnDate(qtChild, lnsLocal, tfss, date, maxLabelNames, maxMetrics)
			defer mu.Unlock()
			if errGlobal != nil {
			if err != nil {
				errGlobal = err
			if len(lns) >= maxLabelNames {
			for k := range lnsLocal {
				lns[k] = struct{}{}
	return errGlobal

func (is *indexSearch) searchLabelNamesWithFiltersOnDate(qt *querytracer.Tracer, lns map[string]struct{}, tfss []*TagFilters, date uint64, maxLabelNames, maxMetrics int) error {
	filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
	if err != nil {
		return err
	if filter != nil && filter.Len() <= 100e3 {
		// It is faster to obtain label names by metricIDs from the filter
		// instead of scanning the inverted index for the matching filters.
		// This would help
		metricIDs := filter.AppendTo(nil)
		qt.Printf("sort %d metricIDs", len(metricIDs))
		return is.getLabelNamesForMetricIDs(qt, metricIDs, lns, maxLabelNames)
	var prevLabelName []byte
	ts := &is.ts
	kb := &is.kb
	mp := &
	dmis := is.db.s.getDeletedMetricIDs()
	loopsPaceLimiter := 0
	underscoreNameSeen := false
	nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
	if date == 0 {
		nsPrefixExpected = nsPrefixTagToMetricIDs
	kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
	prefix := kb.B
	for len(lns) < maxLabelNames && ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
		if err := mp.Init(item, nsPrefixExpected); err != nil {
			return err
		if mp.GetMatchingSeriesCount(filter, dmis) == 0 {
		labelName := mp.Tag.Key
		if len(labelName) == 0 {
			underscoreNameSeen = true
		if isArtificialTagKey(labelName) || string(labelName) == string(prevLabelName) {
			// Search for the next tag key.
			// The last char in kb.B must be tagSeparatorChar.
			// Just increment it in order to jump to the next tag key.
			kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
			if len(labelName) > 0 && labelName[0] == compositeTagKeyPrefix {
				// skip composite tag entries
				kb.B = append(kb.B, compositeTagKeyPrefix)
			} else {
				kb.B = marshalTagValue(kb.B, labelName)
		lns[string(labelName)] = struct{}{}
		prevLabelName = append(prevLabelName[:0], labelName...)
	if underscoreNameSeen {
		lns["__name__"] = struct{}{}
	if err := ts.Error(); err != nil {
		return fmt.Errorf("error during search for prefix %q: %w", prefix, err)
	return nil

func (is *indexSearch) getLabelNamesForMetricIDs(qt *querytracer.Tracer, metricIDs []uint64, lns map[string]struct{}, maxLabelNames int) error {
	lns["__name__"] = struct{}{}
	var mn MetricName
	foundLabelNames := 0
	var buf []byte
	for _, metricID := range metricIDs {
		var err error
		buf, err = is.searchMetricNameWithCache(buf[:0], metricID)
		if err != nil {
			if err == io.EOF {
				// It is likely the metricID->metricName entry didn't propagate to inverted index yet.
				// Skip this metricID for now.
			return fmt.Errorf("cannot find metricName by metricID %d: %w", metricID, err)
		if err := mn.Unmarshal(buf); err != nil {
			return fmt.Errorf("cannot unmarshal metricName %q: %w", buf, err)
		for _, tag := range mn.Tags {
			_, ok := lns[string(tag.Key)]
			if !ok {
				lns[string(tag.Key)] = struct{}{}
				if len(lns) >= maxLabelNames {
					qt.Printf("hit the limit on the number of unique label names: %d", maxLabelNames)
					return nil
	qt.Printf("get %d distinct label names from %d metricIDs", foundLabelNames, len(metricIDs))
	return nil

// SearchLabelValuesWithFiltersOnTimeRange returns label values for the given labelName, tfss and tr.
func (db *indexDB) SearchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, labelName string, tfss []*TagFilters, tr TimeRange,
	maxLabelValues, maxMetrics int, deadline uint64) ([]string, error) {
	qt = qt.NewChild("search for label values: labelName=%q, filters=%s, timeRange=%s, maxLabelNames=%d, maxMetrics=%d", labelName, tfss, &tr, maxLabelValues, maxMetrics)
	defer qt.Done()
	lvs := make(map[string]struct{})
	qtChild := qt.NewChild("search for label values in the current indexdb")
	is := db.getIndexSearch(deadline)
	err := is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics)
	qtChild.Donef("found %d label values", len(lvs))
	if err != nil {
		return nil, err
	db.doExtDB(func(extDB *indexDB) {
		qtChild := qt.NewChild("search for label values in the previous indexdb")
		lvsLen := len(lvs)
		is := extDB.getIndexSearch(deadline)
		err = is.searchLabelValuesWithFiltersOnTimeRange(qtChild, lvs, labelName, tfss, tr, maxLabelValues, maxMetrics)
		qtChild.Donef("found %d additional label values", len(lvs)-lvsLen)
	if err != nil {
		return nil, err

	labelValues := make([]string, 0, len(lvs))
	for labelValue := range lvs {
		if len(labelValue) == 0 {
			// Skip empty values, since they have no any meaning.
			// See
		labelValues = append(labelValues, labelValue)
	// Do not sort labelValues, since they must be sorted by vmselect.
	qt.Printf("found %d label values in the current and the previous indexdb", len(labelValues))
	return labelValues, nil

func (is *indexSearch) searchLabelValuesWithFiltersOnTimeRange(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
	tr TimeRange, maxLabelValues, maxMetrics int) error {
	minDate := uint64(tr.MinTimestamp) / msecPerDay
	maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
	if maxDate == 0 || minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
		qtChild := qt.NewChild("search for label values in global index: labelName=%q, filters=%s", labelName, tfss)
		err := is.searchLabelValuesWithFiltersOnDate(qtChild, lvs, labelName, tfss, 0, maxLabelValues, maxMetrics)
		return err
	var mu sync.Mutex
	wg := getWaitGroup()
	var errGlobal error
	qt = qt.NewChild("parallel search for label values: labelName=%q, filters=%s, timeRange=%s", labelName, tfss, &tr)
	for date := minDate; date <= maxDate; date++ {
		qtChild := qt.NewChild("search for label names: filters=%s, date=%s", tfss, dateToString(date))
		go func(date uint64) {
			defer func() {
			lvsLocal := make(map[string]struct{})
			isLocal := is.db.getIndexSearch(is.deadline)
			err := isLocal.searchLabelValuesWithFiltersOnDate(qtChild, lvsLocal, labelName, tfss, date, maxLabelValues, maxMetrics)
			defer mu.Unlock()
			if errGlobal != nil {
			if err != nil {
				errGlobal = err
			if len(lvs) >= maxLabelValues {
			for v := range lvsLocal {
				lvs[v] = struct{}{}
	return errGlobal

func (is *indexSearch) searchLabelValuesWithFiltersOnDate(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, tfss []*TagFilters,
	date uint64, maxLabelValues, maxMetrics int) error {
	filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
	if err != nil {
		return err
	if filter != nil && filter.Len() < 100e3 {
		// It is faster to obtain label values by metricIDs from the filter
		// instead of scanning the inverted index for the matching filters.
		// This would help
		metricIDs := filter.AppendTo(nil)
		qt.Printf("sort %d metricIDs", len(metricIDs))
		return is.getLabelValuesForMetricIDs(qt, lvs, labelName, metricIDs, maxLabelValues)
	if labelName == "__name__" {
		// __name__ label is encoded as empty string in indexdb.
		labelName = ""
	labelNameBytes := bytesutil.ToUnsafeBytes(labelName)
	var prevLabelValue []byte
	ts := &is.ts
	kb := &is.kb
	mp := &
	dmis := is.db.s.getDeletedMetricIDs()
	loopsPaceLimiter := 0
	nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
	if date == 0 {
		nsPrefixExpected = nsPrefixTagToMetricIDs
	kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
	kb.B = marshalTagValue(kb.B, labelNameBytes)
	prefix := kb.B
	for len(lvs) < maxLabelValues && ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
		if err := mp.Init(item, nsPrefixExpected); err != nil {
			return err
		if mp.GetMatchingSeriesCount(filter, dmis) == 0 {
		labelValue := mp.Tag.Value
		if string(labelValue) == string(prevLabelValue) {
			// Search for the next tag value.
			// The last char in kb.B must be tagSeparatorChar.
			// Just increment it in order to jump to the next tag value.
			kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
			kb.B = marshalTagValue(kb.B, labelNameBytes)
			kb.B = marshalTagValue(kb.B, labelValue)
		lvs[string(labelValue)] = struct{}{}
		prevLabelValue = append(prevLabelValue[:0], labelValue...)
	if err := ts.Error(); err != nil {
		return fmt.Errorf("error when searching for tag name prefix %q: %w", prefix, err)
	return nil

func (is *indexSearch) getLabelValuesForMetricIDs(qt *querytracer.Tracer, lvs map[string]struct{}, labelName string, metricIDs []uint64, maxLabelValues int) error {
	if labelName == "" {
		labelName = "__name__"
	var mn MetricName
	foundLabelValues := 0
	var buf []byte
	for _, metricID := range metricIDs {
		var err error
		buf, err = is.searchMetricNameWithCache(buf[:0], metricID)
		if err != nil {
			if err == io.EOF {
				// It is likely the metricID->metricName entry didn't propagate to inverted index yet.
				// Skip this metricID for now.
			return fmt.Errorf("cannot find metricName by metricID %d: %w", metricID, err)
		if err := mn.Unmarshal(buf); err != nil {
			return fmt.Errorf("cannot unmarshal metricName %q: %w", buf, err)
		tagValue := mn.GetTagValue(labelName)
		_, ok := lvs[string(tagValue)]
		if !ok {
			lvs[string(tagValue)] = struct{}{}
			if len(lvs) >= maxLabelValues {
				qt.Printf("hit the limit on the number of unique label values for label %q: %d", labelName, maxLabelValues)
				return nil
	qt.Printf("get %d distinct values for label %q from %d metricIDs", foundLabelValues, labelName, len(metricIDs))
	return nil

// SearchTagValueSuffixes returns all the tag value suffixes for the given tagKey and tagValuePrefix on the given tr.
// This allows implementing or similar APIs.
// If it returns maxTagValueSuffixes suffixes, then it is likely more than maxTagValueSuffixes suffixes is found.
func (db *indexDB) SearchTagValueSuffixes(qt *querytracer.Tracer, tr TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int, deadline uint64) ([]string, error) {
	qt = qt.NewChild("search tag value suffixes for timeRange=%s, tagKey=%q, tagValuePrefix=%q, delimiter=%c, maxTagValueSuffixes=%d",
		&tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
	defer qt.Done()

	// TODO: cache results?

	tvss := make(map[string]struct{})
	is := db.getIndexSearch(deadline)
	err := is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
	if err != nil {
		return nil, err
	if len(tvss) < maxTagValueSuffixes {
		db.doExtDB(func(extDB *indexDB) {
			is := extDB.getIndexSearch(deadline)
			qtChild := qt.NewChild("search tag value suffixes in the previous indexdb")
			err = is.searchTagValueSuffixesForTimeRange(tvss, tr, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
		if err != nil {
			return nil, err

	suffixes := make([]string, 0, len(tvss))
	for suffix := range tvss {
		// Do not skip empty suffixes, since they may represent leaf tag values.
		suffixes = append(suffixes, suffix)
	if len(suffixes) > maxTagValueSuffixes {
		suffixes = suffixes[:maxTagValueSuffixes]
	// Do not sort suffixes, since they must be sorted by vmselect.
	qt.Printf("found %d suffixes", len(suffixes))
	return suffixes, nil

func (is *indexSearch) searchTagValueSuffixesForTimeRange(tvss map[string]struct{}, tr TimeRange, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int) error {
	minDate := uint64(tr.MinTimestamp) / msecPerDay
	maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
	if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
		return is.searchTagValueSuffixesAll(tvss, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
	// Query over multiple days in parallel.
	wg := getWaitGroup()
	var errGlobal error
	var mu sync.Mutex // protects tvss + errGlobal from concurrent access below.
	for minDate <= maxDate {
		go func(date uint64) {
			defer wg.Done()
			tvssLocal := make(map[string]struct{})
			isLocal := is.db.getIndexSearch(is.deadline)
			err := isLocal.searchTagValueSuffixesForDate(tvssLocal, date, tagKey, tagValuePrefix, delimiter, maxTagValueSuffixes)
			defer mu.Unlock()
			if errGlobal != nil {
			if err != nil {
				errGlobal = err
			if len(tvss) > maxTagValueSuffixes {
			for k := range tvssLocal {
				tvss[k] = struct{}{}
	return errGlobal

func (is *indexSearch) searchTagValueSuffixesAll(tvss map[string]struct{}, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int) error {
	kb := &is.kb
	nsPrefix := byte(nsPrefixTagToMetricIDs)
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefix)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagKey))
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValuePrefix))
	kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
	prefix := append([]byte(nil), kb.B...)
	return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)

func (is *indexSearch) searchTagValueSuffixesForDate(tvss map[string]struct{}, date uint64, tagKey, tagValuePrefix string, delimiter byte, maxTagValueSuffixes int) error {
	nsPrefix := byte(nsPrefixDateTagToMetricIDs)
	kb := &is.kb
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefix)
	kb.B = encoding.MarshalUint64(kb.B, date)
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagKey))
	kb.B = marshalTagValue(kb.B, bytesutil.ToUnsafeBytes(tagValuePrefix))
	kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar from the end of kb.B
	prefix := append([]byte(nil), kb.B...)
	return is.searchTagValueSuffixesForPrefix(tvss, nsPrefix, prefix, len(tagValuePrefix), delimiter, maxTagValueSuffixes)

func (is *indexSearch) searchTagValueSuffixesForPrefix(tvss map[string]struct{}, nsPrefix byte, prefix []byte, tagValuePrefixLen int, delimiter byte, maxTagValueSuffixes int) error {
	kb := &is.kb
	ts := &is.ts
	mp := &
	dmis := is.db.s.getDeletedMetricIDs()
	loopsPaceLimiter := 0
	for len(tvss) < maxTagValueSuffixes && ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
		if err := mp.Init(item, nsPrefix); err != nil {
			return err
		if mp.GetMatchingSeriesCount(nil, dmis) == 0 {
		tagValue := mp.Tag.Value
		suffix := tagValue[tagValuePrefixLen:]
		n := bytes.IndexByte(suffix, delimiter)
		if n < 0 {
			// Found leaf tag value that doesn't have delimiters after the given tagValuePrefix.
			tvss[string(suffix)] = struct{}{}
		// Found non-leaf tag value. Extract suffix that end with the given delimiter.
		suffix = suffix[:n+1]
		tvss[string(suffix)] = struct{}{}
		if suffix[len(suffix)-1] == 255 {
		// Search for the next suffix
		kb.B = append(kb.B[:0], prefix...)
		kb.B = marshalTagValue(kb.B, suffix)
		kb.B = kb.B[:len(kb.B)-1] // remove tagSeparatorChar
	if err := ts.Error(); err != nil {
		return fmt.Errorf("error when searching for tag value sufixes for prefix %q: %w", prefix, err)
	return nil

// GetSeriesCount returns the approximate number of unique timeseries in the db.
// It includes the deleted series too and may count the same series
// up to two times - in db and extDB.
func (db *indexDB) GetSeriesCount(deadline uint64) (uint64, error) {
	is := db.getIndexSearch(deadline)
	n, err := is.getSeriesCount()
	if err != nil {
		return 0, err

	var nExt uint64
	db.doExtDB(func(extDB *indexDB) {
		is := extDB.getIndexSearch(deadline)
		nExt, err = is.getSeriesCount()
	if err != nil {
		return 0, fmt.Errorf("error when searching in extDB: %w", err)
	return n + nExt, nil

func (is *indexSearch) getSeriesCount() (uint64, error) {
	ts := &is.ts
	kb := &is.kb
	mp := &
	loopsPaceLimiter := 0
	var metricIDsLen uint64
	// Extract the number of series from ((__name__=value): metricIDs) rows
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
	kb.B = marshalTagValue(kb.B, nil)
	for ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return 0, err
		item := ts.Item
		if !bytes.HasPrefix(item, kb.B) {
		tail := item[len(kb.B):]
		n := bytes.IndexByte(tail, tagSeparatorChar)
		if n < 0 {
			return 0, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
		tail = tail[n+1:]
		if err := mp.InitOnlyTail(item, tail); err != nil {
			return 0, err
		// Take into account deleted timeseries too.
		// It is OK if series can be counted multiple times in rare cases -
		// the returned number is an estimation.
		metricIDsLen += uint64(mp.MetricIDsLen())
	if err := ts.Error(); err != nil {
		return 0, fmt.Errorf("error when counting unique timeseries: %w", err)
	return metricIDsLen, nil

// GetTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel.
func (db *indexDB) GetTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int, deadline uint64) (*TSDBStatus, error) {
	qtChild := qt.NewChild("collect tsdb stats in the current indexdb")
	is := db.getIndexSearch(deadline)
	status, err := is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics)
	if err != nil {
		return nil, err
	if status.hasEntries() {
		return status, nil
	db.doExtDB(func(extDB *indexDB) {
		qtChild := qt.NewChild("collect tsdb stats in the previous indexdb")
		is := extDB.getIndexSearch(deadline)
		status, err = is.getTSDBStatus(qtChild, tfss, date, focusLabel, topN, maxMetrics)
	if err != nil {
		return nil, fmt.Errorf("error when obtaining TSDB status from extDB: %w", err)
	return status, nil

// getTSDBStatus returns topN entries for tsdb status for the given tfss, date and focusLabel.
func (is *indexSearch) getTSDBStatus(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, focusLabel string, topN, maxMetrics int) (*TSDBStatus, error) {
	filter, err := is.searchMetricIDsWithFiltersOnDate(qt, tfss, date, maxMetrics)
	if err != nil {
		return nil, err
	if filter != nil && filter.Len() == 0 {
		qt.Printf("no matching series for filter=%s", tfss)
		return &TSDBStatus{}, nil
	ts := &is.ts
	kb := &is.kb
	mp := &
	dmis := is.db.s.getDeletedMetricIDs()
	thSeriesCountByMetricName := newTopHeap(topN)
	thSeriesCountByLabelName := newTopHeap(topN)
	thSeriesCountByFocusLabelValue := newTopHeap(topN)
	thSeriesCountByLabelValuePair := newTopHeap(topN)
	thLabelValueCountByLabelName := newTopHeap(topN)
	var tmp, prevLabelName, prevLabelValuePair []byte
	var labelValueCountByLabelName, seriesCountByLabelValuePair uint64
	var totalSeries, labelSeries, totalLabelValuePairs uint64
	nameEqualBytes := []byte("__name__=")
	focusLabelEqualBytes := []byte(focusLabel + "=")

	loopsPaceLimiter := 0
	nsPrefixExpected := byte(nsPrefixDateTagToMetricIDs)
	if date == 0 {
		nsPrefixExpected = nsPrefixTagToMetricIDs
	kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
	prefix := kb.B
	for ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return nil, err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
		if err := mp.Init(item, nsPrefixExpected); err != nil {
			return nil, err
		matchingSeriesCount := mp.GetMatchingSeriesCount(filter, dmis)
		if matchingSeriesCount == 0 {
			// Skip rows without matching metricIDs.
		tmp = append(tmp[:0], mp.Tag.Key...)
		labelName := tmp
		if isArtificialTagKey(labelName) {
			// Skip artificially created tag keys.
			kb.B = append(kb.B[:0], prefix...)
			if len(labelName) > 0 && labelName[0] == compositeTagKeyPrefix {
				kb.B = append(kb.B, compositeTagKeyPrefix)
			} else {
				kb.B = marshalTagValue(kb.B, labelName)
		if len(labelName) == 0 {
			labelName = append(labelName, "__name__"...)
			tmp = labelName
		if string(labelName) == "__name__" {
			totalSeries += uint64(matchingSeriesCount)
		tmp = append(tmp, '=')
		tmp = append(tmp, mp.Tag.Value...)
		labelValuePair := tmp
		if len(prevLabelName) == 0 {
			prevLabelName = append(prevLabelName[:0], labelName...)
		if string(labelName) != string(prevLabelName) {
			thLabelValueCountByLabelName.push(prevLabelName, labelValueCountByLabelName)
			thSeriesCountByLabelName.push(prevLabelName, labelSeries)
			labelSeries = 0
			labelValueCountByLabelName = 0
			prevLabelName = append(prevLabelName[:0], labelName...)
		if len(prevLabelValuePair) == 0 {
			prevLabelValuePair = append(prevLabelValuePair[:0], labelValuePair...)
		if string(labelValuePair) != string(prevLabelValuePair) {
			thSeriesCountByLabelValuePair.push(prevLabelValuePair, seriesCountByLabelValuePair)
			if bytes.HasPrefix(prevLabelValuePair, nameEqualBytes) {
				thSeriesCountByMetricName.push(prevLabelValuePair[len(nameEqualBytes):], seriesCountByLabelValuePair)
			if bytes.HasPrefix(prevLabelValuePair, focusLabelEqualBytes) {
				thSeriesCountByFocusLabelValue.push(prevLabelValuePair[len(focusLabelEqualBytes):], seriesCountByLabelValuePair)
			seriesCountByLabelValuePair = 0
			prevLabelValuePair = append(prevLabelValuePair[:0], labelValuePair...)
		// It is OK if series can be counted multiple times in rare cases -
		// the returned number is an estimation.
		labelSeries += uint64(matchingSeriesCount)
		seriesCountByLabelValuePair += uint64(matchingSeriesCount)
		totalLabelValuePairs += uint64(matchingSeriesCount)
	if err := ts.Error(); err != nil {
		return nil, fmt.Errorf("error when counting time series by metric names: %w", err)
	thLabelValueCountByLabelName.push(prevLabelName, labelValueCountByLabelName)
	thSeriesCountByLabelName.push(prevLabelName, labelSeries)
	thSeriesCountByLabelValuePair.push(prevLabelValuePair, seriesCountByLabelValuePair)
	if bytes.HasPrefix(prevLabelValuePair, nameEqualBytes) {
		thSeriesCountByMetricName.push(prevLabelValuePair[len(nameEqualBytes):], seriesCountByLabelValuePair)
	if bytes.HasPrefix(prevLabelValuePair, focusLabelEqualBytes) {
		thSeriesCountByFocusLabelValue.push(prevLabelValuePair[len(focusLabelEqualBytes):], seriesCountByLabelValuePair)
	status := &TSDBStatus{
		TotalSeries:                  totalSeries,
		TotalLabelValuePairs:         totalLabelValuePairs,
		SeriesCountByMetricName:      thSeriesCountByMetricName.getSortedResult(),
		SeriesCountByLabelName:       thSeriesCountByLabelName.getSortedResult(),
		SeriesCountByFocusLabelValue: thSeriesCountByFocusLabelValue.getSortedResult(),
		SeriesCountByLabelValuePair:  thSeriesCountByLabelValuePair.getSortedResult(),
		LabelValueCountByLabelName:   thLabelValueCountByLabelName.getSortedResult(),
	return status, nil

// TSDBStatus contains TSDB status data for /api/v1/status/tsdb.
// See
type TSDBStatus struct {
	TotalSeries                  uint64
	TotalLabelValuePairs         uint64
	SeriesCountByMetricName      []TopHeapEntry
	SeriesCountByLabelName       []TopHeapEntry
	SeriesCountByFocusLabelValue []TopHeapEntry
	SeriesCountByLabelValuePair  []TopHeapEntry
	LabelValueCountByLabelName   []TopHeapEntry

func (status *TSDBStatus) hasEntries() bool {
	return len(status.SeriesCountByLabelValuePair) > 0

// topHeap maintains a heap of topHeapEntries with the maximum TopHeapEntry.n values.
type topHeap struct {
	topN int
	a    []TopHeapEntry

// newTopHeap returns topHeap for topN items.
func newTopHeap(topN int) *topHeap {
	return &topHeap{
		topN: topN,

// TopHeapEntry represents an entry from `top heap` used in stats.
type TopHeapEntry struct {
	Name  string
	Count uint64

func (th *topHeap) push(name []byte, count uint64) {
	if count == 0 {
	if len(th.a) < th.topN {
		th.a = append(th.a, TopHeapEntry{
			Name:  string(name),
			Count: count,
		heap.Fix(th, len(th.a)-1)
	if count <= th.a[0].Count {
	th.a[0] = TopHeapEntry{
		Name:  string(name),
		Count: count,
	heap.Fix(th, 0)

func (th *topHeap) getSortedResult() []TopHeapEntry {
	result := append([]TopHeapEntry{}, th.a...)
	sort.Slice(result, func(i, j int) bool {
		a, b := result[i], result[j]
		if a.Count != b.Count {
			return a.Count > b.Count
		return a.Name < b.Name
	return result

// heap.Interface implementation for topHeap.

func (th *topHeap) Len() int {
	return len(th.a)

func (th *topHeap) Less(i, j int) bool {
	a := th.a
	return a[i].Count < a[j].Count

func (th *topHeap) Swap(i, j int) {
	a := th.a
	a[j], a[i] = a[i], a[j]

func (th *topHeap) Push(x interface{}) {
	panic(fmt.Errorf("BUG: Push shouldn't be called"))

func (th *topHeap) Pop() interface{} {
	panic(fmt.Errorf("BUG: Pop shouldn't be called"))

// searchMetricNameWithCache appends metric name for the given metricID to dst
// and returns the result.
func (db *indexDB) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
	metricName := db.getMetricNameFromCache(dst, metricID)
	if len(metricName) > len(dst) {
		return metricName, nil

	is := db.getIndexSearch(noDeadline)
	var err error
	dst, err = is.searchMetricName(dst, metricID)
	if err == nil {
		// There is no need in verifying whether the given metricID is deleted,
		// since the filtering must be performed before calling this func.
		db.putMetricNameToCache(metricID, dst)
		return dst, nil
	if err != io.EOF {
		return dst, err

	// Try searching in the external indexDB.
	if db.doExtDB(func(extDB *indexDB) {
		is := extDB.getIndexSearch(noDeadline)
		dst, err = is.searchMetricName(dst, metricID)
		if err == nil {
			// There is no need in verifying whether the given metricID is deleted,
			// since the filtering must be performed before calling this func.
			extDB.putMetricNameToCache(metricID, dst)
	}) {
		return dst, err

	// Cannot find MetricName for the given metricID. This may be the case
	// when indexDB contains incomplete set of metricID -> metricName entries
	// after a snapshot or due to unflushed entries.
	atomic.AddUint64(&db.missingMetricNamesForMetricID, 1)

	// Mark the metricID as deleted, so it will be created again when new data point
	// for the given time series will arrive.
	return dst, io.EOF

// DeleteTSIDs marks as deleted all the TSIDs matching the given tfss.
// The caller must reset all the caches which may contain the deleted TSIDs.
// Returns the number of metrics deleted.
func (db *indexDB) DeleteTSIDs(qt *querytracer.Tracer, tfss []*TagFilters) (int, error) {
	qt = qt.NewChild("deleting series for %s", tfss)
	defer qt.Done()
	if len(tfss) == 0 {
		return 0, nil

	// Obtain metricIDs to delete.
	tr := TimeRange{
		MinTimestamp: 0,
		MaxTimestamp: (1 << 63) - 1,
	is := db.getIndexSearch(noDeadline)
	metricIDs, err := is.searchMetricIDs(qt, tfss, tr, 2e9)
	if err != nil {
		return 0, err

	// Delete TSIDs in the extDB.
	deletedCount := len(metricIDs)
	db.doExtDB(func(extDB *indexDB) {
		var n int
		qtChild := qt.NewChild("deleting series from the previos indexdb")
		n, err = extDB.DeleteTSIDs(qtChild, tfss)
		qtChild.Donef("deleted %d series", n)
		deletedCount += n
	if err != nil {
		return deletedCount, fmt.Errorf("cannot delete tsids in extDB: %w", err)
	return deletedCount, nil

func (db *indexDB) deleteMetricIDs(metricIDs []uint64) {
	if len(metricIDs) == 0 {
		// Nothing to delete

	// atomically add deleted metricIDs to an inmemory map.
	dmis := &uint64set.Set{}

	// Reset TagFilters -> TSIDS cache, since it may contain deleted TSIDs.

	// Reset MetricName -> TSID cache, since it may contain deleted TSIDs.

	// Store the metricIDs as deleted.
	// Make this after updating the deletedMetricIDs and resetting caches
	// in order to exclude the possibility of the inconsistent state when the deleted metricIDs
	// remain available in the tsidCache after unclean shutdown.
	// See
	items := getIndexItems()
	for _, metricID := range metricIDs {
		items.B = append(items.B, nsPrefixDeletedMetricID)
		items.B = encoding.MarshalUint64(items.B, metricID)

func (db *indexDB) loadDeletedMetricIDs() (*uint64set.Set, error) {
	is := db.getIndexSearch(noDeadline)
	dmis, err := is.loadDeletedMetricIDs()
	if err != nil {
		return nil, err
	return dmis, nil

func (is *indexSearch) loadDeletedMetricIDs() (*uint64set.Set, error) {
	dmis := &uint64set.Set{}
	ts := &is.ts
	kb := &is.kb
	kb.B = append(kb.B[:0], nsPrefixDeletedMetricID)
	for ts.NextItem() {
		item := ts.Item
		if !bytes.HasPrefix(item, kb.B) {
		item = item[len(kb.B):]
		if len(item) != 8 {
			return nil, fmt.Errorf("unexpected item len; got %d bytes; want %d bytes", len(item), 8)
		metricID := encoding.UnmarshalUint64(item)
	if err := ts.Error(); err != nil {
		return nil, err
	return dmis, nil

func (db *indexDB) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int, deadline uint64) ([]uint64, error) {
	qt = qt.NewChild("search for matching metricIDs: filters=%s, timeRange=%s", tfss, &tr)
	defer qt.Done()

	if len(tfss) == 0 {
		return nil, nil
	if tr.MinTimestamp >= db.s.minTimestampForCompositeIndex {
		tfss = convertToCompositeTagFilterss(tfss)

	qtChild := qt.NewChild("search for metricIDs in the current indexdb")
	tfKeyBuf := tagFiltersKeyBufPool.Get()
	defer tagFiltersKeyBufPool.Put(tfKeyBuf)

	tfKeyBuf.B = marshalTagFiltersKey(tfKeyBuf.B[:0], tfss, tr, true)
	metricIDs, ok := db.getMetricIDsFromTagFiltersCache(qtChild, tfKeyBuf.B)
	if ok {
		// Fast path - metricIDs found in the cache
		return metricIDs, nil

	// Slow path - search for metricIDs in the db and extDB.
	is := db.getIndexSearch(deadline)
	localMetricIDs, err := is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
	if err != nil {
		return nil, fmt.Errorf("error when searching for metricIDs in the current indexdb: %s", err)

	var extMetricIDs []uint64
	db.doExtDB(func(extDB *indexDB) {
		qtChild := qt.NewChild("search for metricIDs in the previous indexdb")
		defer qtChild.Done()

		tfKeyExtBuf := tagFiltersKeyBufPool.Get()
		defer tagFiltersKeyBufPool.Put(tfKeyExtBuf)

		// Data in extDB cannot be changed, so use unversioned keys for tag cache.
		tfKeyExtBuf.B = marshalTagFiltersKey(tfKeyExtBuf.B[:0], tfss, tr, false)
		metricIDs, ok := extDB.getMetricIDsFromTagFiltersCache(qtChild, tfKeyExtBuf.B)
		if ok {
			extMetricIDs = metricIDs
		is := extDB.getIndexSearch(deadline)
		extMetricIDs, err = is.searchMetricIDs(qtChild, tfss, tr, maxMetrics)
		extDB.putMetricIDsToTagFiltersCache(qtChild, extMetricIDs, tfKeyExtBuf.B)
	if err != nil {
		return nil, fmt.Errorf("error when searching for metricIDs in the previous indexdb: %s", err)

	// Merge localMetricIDs with extMetricIDs.
	metricIDs = mergeSortedMetricIDs(localMetricIDs, extMetricIDs)
	qt.Printf("merge %d metricIDs from the current indexdb with %d metricIDs from the previous indexdb; result: %d metricIDs",
		len(localMetricIDs), len(extMetricIDs), len(metricIDs))

	// Store metricIDs in the cache.
	db.putMetricIDsToTagFiltersCache(qt, metricIDs, tfKeyBuf.B)

	return metricIDs, nil

func mergeSortedMetricIDs(a, b []uint64) []uint64 {
	if len(b) == 0 {
		return a
	i := 0
	j := 0
	result := make([]uint64, 0, len(a)+len(b))
	for {
		next := b[j]
		start := i
		for i < len(a) && a[i] <= next {
		result = append(result, a[start:i]...)
		if len(result) > 0 {
			last := result[len(result)-1]
			for j < len(b) && b[j] == last {
		if i == len(a) {
			return append(result, b[j:]...)
		a, b = b, a
		i, j = j, i

func (db *indexDB) getTSIDsFromMetricIDs(qt *querytracer.Tracer, metricIDs []uint64, deadline uint64) ([]TSID, error) {
	qt = qt.NewChild("obtain tsids from %d metricIDs", len(metricIDs))
	defer qt.Done()

	if len(metricIDs) == 0 {
		return nil, nil
	tsids := make([]TSID, len(metricIDs))
	var extMetricIDs []uint64
	i := 0
	err := func() error {
		is := db.getIndexSearch(deadline)
		defer db.putIndexSearch(is)
		for loopsPaceLimiter, metricID := range metricIDs {
			if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
				if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
					return err
			// Try obtaining TSIDs from MetricID->TSID cache. This is much faster
			// than scanning the mergeset if it contains a lot of metricIDs.
			tsid := &tsids[i]
			err := is.db.getFromMetricIDCache(tsid, metricID)
			if err == nil {
				// Fast path - the tsid for metricID is found in cache.
			if err != io.EOF {
				return err
			if err := is.getTSIDByMetricID(tsid, metricID); err != nil {
				if err == io.EOF {
					// Postpone searching for the metricID in the extDB.
					extMetricIDs = append(extMetricIDs, metricID)
				return fmt.Errorf("cannot find tsid %d out of %d for metricID %d: %w", i, len(metricIDs), metricID, err)
			is.db.putToMetricIDCache(metricID, tsid)
		return nil
	if err != nil {
		return nil, fmt.Errorf("error when searching for TISDs by metricIDs in the current indexdb: %w", err)
	tsidsFound := i
	qt.Printf("found %d tsids for %d metricIDs in the current indexdb", tsidsFound, len(metricIDs))

	// Search for extMetricIDs in the extDB.
	db.doExtDB(func(extDB *indexDB) {
		is := extDB.getIndexSearch(deadline)
		defer extDB.putIndexSearch(is)
		for loopsPaceLimiter, metricID := range extMetricIDs {
			if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
				if err = checkSearchDeadlineAndPace(is.deadline); err != nil {
			// There is no need in searching for TSIDs in MetricID->TSID cache, since
			// this has been already done in the loop above (the MetricID->TSID cache is global).
			tsid := &tsids[i]
			if err = is.getTSIDByMetricID(tsid, metricID); err != nil {
				if err == io.EOF {
					// Cannot find TSID for the given metricID.
					// This may be the case on incomplete indexDB
					// due to snapshot or due to unflushed entries.
					// Just increment errors counter and skip it for now.
					atomic.AddUint64(&is.db.missingTSIDsForMetricID, 1)
					err = nil
				err = fmt.Errorf("cannot find tsid for metricID=%d: %w", metricID, err)
			is.db.putToMetricIDCache(metricID, tsid)
	if err != nil {
		return nil, fmt.Errorf("error when searching for TSIDs by metricIDs in the previous indexdb: %w", err)
	qt.Printf("found %d tsids for %d metricIDs in the previous indexdb", i-tsidsFound, len(extMetricIDs))

	tsids = tsids[:i]
	qt.Printf("load %d tsids for %d metricIDs from both current and previous indexdb", len(tsids), len(metricIDs))

	// Sort the found tsids, since they must be passed to TSID search
	// in the sorted order.
	sort.Slice(tsids, func(i, j int) bool { return tsids[i].Less(&tsids[j]) })
	qt.Printf("sort %d tsids", len(tsids))
	return tsids, nil

var tagFiltersKeyBufPool bytesutil.ByteBufferPool

func (is *indexSearch) getTSIDByMetricName(dst *TSID, metricName []byte) error {
	dmis := is.db.s.getDeletedMetricIDs()
	ts := &is.ts
	kb := &is.kb
	kb.B = append(kb.B[:0], nsPrefixMetricNameToTSID)
	kb.B = append(kb.B, metricName...)
	kb.B = append(kb.B, kvSeparatorChar)
	for ts.NextItem() {
		if !bytes.HasPrefix(ts.Item, kb.B) {
			// Nothing found.
			return io.EOF
		v := ts.Item[len(kb.B):]
		tail, err := dst.Unmarshal(v)
		if err != nil {
			return fmt.Errorf("cannot unmarshal TSID: %w", err)
		if len(tail) > 0 {
			return fmt.Errorf("unexpected non-empty tail left after unmarshaling TSID: %X", tail)
		if dmis.Len() > 0 {
			// Verify whether the dst is marked as deleted.
			if dmis.Has(dst.MetricID) {
				// The dst is deleted. Continue searching.
		// Found valid dst.
		return nil
	if err := ts.Error(); err != nil {
		return fmt.Errorf("error when searching TSID by metricName; searchPrefix %q: %w", kb.B, err)
	// Nothing found
	return io.EOF

func (is *indexSearch) searchMetricNameWithCache(dst []byte, metricID uint64) ([]byte, error) {
	metricName := is.db.getMetricNameFromCache(dst, metricID)
	if len(metricName) > len(dst) {
		return metricName, nil
	var err error
	dst, err = is.searchMetricName(dst, metricID)
	if err == nil {
		// There is no need in verifying whether the given metricID is deleted,
		// since the filtering must be performed before calling this func.
		is.db.putMetricNameToCache(metricID, dst)
		return dst, nil
	return dst, err

func (is *indexSearch) searchMetricName(dst []byte, metricID uint64) ([]byte, error) {
	ts := &is.ts
	kb := &is.kb
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToMetricName)
	kb.B = encoding.MarshalUint64(kb.B, metricID)
	if err := ts.FirstItemWithPrefix(kb.B); err != nil {
		if err == io.EOF {
			return dst, err
		return dst, fmt.Errorf("error when searching metricName by metricID; searchPrefix %q: %w", kb.B, err)
	v := ts.Item[len(kb.B):]
	dst = append(dst, v...)
	return dst, nil

func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
	ts := &is.ts
	kb := &is.kb

	// Verify whether the tr.MinTimestamp is included into `ts` or is smaller than the minimum date stored in `ts`.
	// Do not check whether tr.MaxTimestamp is included into `ts` or is bigger than the max date stored in `ts` for performance reasons.
	// This means that containsTimeRange() can return true if `tr` is located below the min date stored in `ts`.
	// This is OK, since this case isn't encountered too much in practice.
	// The main practical case allows skipping searching in prev indexdb (`ts`) when `tr`
	// is located above the max date stored there.
	minDate := uint64(tr.MinTimestamp) / msecPerDay
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
	prefix := kb.B
	kb.B = encoding.MarshalUint64(kb.B, minDate)
	if !ts.NextItem() {
		if err := ts.Error(); err != nil {
			return false, fmt.Errorf("error when searching for minDate=%d, prefix %q: %w", minDate, kb.B, err)
		return false, nil
	if !bytes.HasPrefix(ts.Item, prefix) {
		// minDate exceeds max date from ts.
		return false, nil
	return true, nil

func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
	// There is no need in checking for deleted metricIDs here, since they
	// must be checked by the caller.
	ts := &is.ts
	kb := &is.kb
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID)
	kb.B = encoding.MarshalUint64(kb.B, metricID)
	if err := ts.FirstItemWithPrefix(kb.B); err != nil {
		if err == io.EOF {
			return err
		return fmt.Errorf("error when searching TSID by metricID; searchPrefix %q: %w", kb.B, err)
	v := ts.Item[len(kb.B):]
	tail, err := dst.Unmarshal(v)
	if err != nil {
		return fmt.Errorf("cannot unmarshal TSID=%X: %w", v, err)
	if len(tail) > 0 {
		return fmt.Errorf("unexpected non-zero tail left after unmarshaling TSID: %X", tail)
	return nil

// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
// and adds matching metrics to metricIDs.
func (is *indexSearch) updateMetricIDsByMetricNameMatch(qt *querytracer.Tracer, metricIDs, srcMetricIDs *uint64set.Set, tfs []*tagFilter) error {
	qt = qt.NewChild("filter out %d metric ids with filters=%s", srcMetricIDs.Len(), tfs)
	defer qt.Done()

	// sort srcMetricIDs in order to speed up Seek below.
	sortedMetricIDs := srcMetricIDs.AppendTo(nil)
	qt.Printf("sort %d metric ids", len(sortedMetricIDs))

	kb := &is.kb
	kb.B = is.marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
	tfs = removeCompositeTagFilters(tfs, kb.B)

	metricName := kbPool.Get()
	defer kbPool.Put(metricName)
	mn := GetMetricName()
	defer PutMetricName(mn)
	for loopsPaceLimiter, metricID := range sortedMetricIDs {
		if loopsPaceLimiter&paceLimiterSlowIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return err
		var err error
		metricName.B, err = is.searchMetricNameWithCache(metricName.B[:0], metricID)
		if err != nil {
			if err == io.EOF {
				// It is likely the metricID->metricName entry didn't propagate to inverted index yet.
				// Skip this metricID for now.
			return fmt.Errorf("cannot find metricName by metricID %d: %w", metricID, err)
		if err := mn.Unmarshal(metricName.B); err != nil {
			return fmt.Errorf("cannot unmarshal metricName %q: %w", metricName.B, err)

		// Match the mn against tfs.
		ok, err := matchTagFilters(mn, tfs, &is.kb)
		if err != nil {
			return fmt.Errorf("cannot match MetricName %s against tagFilters: %w", mn, err)
		if !ok {
	qt.Printf("apply filters %s; resulting metric ids: %d", tfs, metricIDs.Len())
	return nil

func removeCompositeTagFilters(tfs []*tagFilter, prefix []byte) []*tagFilter {
	if !hasCompositeTagFilters(tfs, prefix) {
		return tfs
	var tagKey []byte
	var name []byte
	tfsNew := make([]*tagFilter, 0, len(tfs)+1)
	for _, tf := range tfs {
		if !bytes.HasPrefix(tf.prefix, prefix) {
			tfsNew = append(tfsNew, tf)
		suffix := tf.prefix[len(prefix):]
		var err error
		_, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
		if err != nil {
			logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
		if len(tagKey) == 0 || tagKey[0] != compositeTagKeyPrefix {
			tfsNew = append(tfsNew, tf)
		tagKey = tagKey[1:]
		var nameLen uint64
		tagKey, nameLen, err = encoding.UnmarshalVarUint64(tagKey)
		if err != nil {
			logger.Panicf("BUG: cannot unmarshal nameLen from tagKey %q: %s", tagKey, err)
		if nameLen == 0 {
			logger.Panicf("BUG: nameLen must be greater than 0")
		if uint64(len(tagKey)) < nameLen {
			logger.Panicf("BUG: expecting at %d bytes for name in tagKey=%q; got %d bytes", nameLen, tagKey, len(tagKey))
		name = append(name[:0], tagKey[:nameLen]...)
		tagKey = tagKey[nameLen:]
		var tfNew tagFilter
		if err := tfNew.Init(prefix, tagKey, tf.value, tf.isNegative, tf.isRegexp); err != nil {
			logger.Panicf("BUG: cannot initialize {%s=%q} filter: %s", tagKey, tf.value, err)
		tfsNew = append(tfsNew, &tfNew)
	if len(name) > 0 {
		var tfNew tagFilter
		if err := tfNew.Init(prefix, nil, name, false, false); err != nil {
			logger.Panicf("BUG: unexpected error when initializing {__name__=%q} filter: %s", name, err)
		tfsNew = append(tfsNew, &tfNew)
	return tfsNew

func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
	var tagKey []byte
	for _, tf := range tfs {
		if !bytes.HasPrefix(tf.prefix, prefix) {
		suffix := tf.prefix[len(prefix):]
		var err error
		_, tagKey, err = unmarshalTagValue(tagKey[:0], suffix)
		if err != nil {
			logger.Panicf("BUG: cannot unmarshal tag key from suffix=%q: %s", suffix, err)
		if len(tagKey) > 0 && tagKey[0] == compositeTagKeyPrefix {
			return true
	return false

func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
	for i, tf := range tfs {
		if bytes.Equal(tf.key, graphiteReverseTagKey) {
			// Skip artificial tag filter for Graphite-like metric names with dots,
			// since mn doesn't contain the corresponding tag.
		if len(tf.key) == 0 || string(tf.key) == "__graphite__" {
			// Match against mn.MetricGroup.
			b := marshalTagValue(kb.B, nil)
			b = marshalTagValue(b, mn.MetricGroup)
			kb.B = b[:len(kb.B)]
			ok, err := tf.match(b)
			if err != nil {
				return false, fmt.Errorf("cannot match MetricGroup %q with tagFilter %s: %w", mn.MetricGroup, tf, err)
			if !ok {
				// Move failed tf to start.
				// This should reduce the amount of useless work for the next mn.
				if i > 0 {
					tfs[0], tfs[i] = tfs[i], tfs[0]
				return false, nil
		// Search for matching tag name.
		tagMatched := false
		tagSeen := false
		for _, tag := range mn.Tags {
			if string(tag.Key) != string(tf.key) {

			// Found the matching tag name. Match the value.
			tagSeen = true
			b := tag.Marshal(kb.B)
			kb.B = b[:len(kb.B)]
			ok, err := tf.match(b)
			if err != nil {
				return false, fmt.Errorf("cannot match tag %q with tagFilter %s: %w", tag, tf, err)
			if !ok {
				// Move failed tf to start.
				// This should reduce the amount of useless work for the next mn.
				if i > 0 {
					tfs[0], tfs[i] = tfs[i], tfs[0]
				return false, nil
			tagMatched = true
		if !tagSeen && (!tf.isNegative && tf.isEmptyMatch || tf.isNegative && !tf.isEmptyMatch) {
			// tf contains positive empty-match filter for non-existing tag key, i.e.
			// {non_existing_tag_key=~"foobar|"}
			// OR
			// tf contains negative filter for non-exsisting tag key
			// and this filter doesn't match empty string, i.e. {non_existing_tag_key!="foobar"}
			// Such filter matches anything.
			// Note that the filter `{non_existing_tag_key!~"|foobar"}` shouldn't match anything,
			// since it is expected that it matches non-empty `non_existing_tag_key`.
			// See and
			// for details.
		if tagMatched {
			// tf matches mn. Go to the next tf.
		// Matching tag name wasn't found.
		// Move failed tf to start.
		// This should reduce the amount of useless work for the next mn.
		if i > 0 {
			tfs[0], tfs[i] = tfs[i], tfs[0]
		return false, nil
	return true, nil

func (is *indexSearch) searchMetricIDsWithFiltersOnDate(qt *querytracer.Tracer, tfss []*TagFilters, date uint64, maxMetrics int) (*uint64set.Set, error) {
	if len(tfss) == 0 {
		return nil, nil
	tr := TimeRange{
		MinTimestamp: int64(date) * msecPerDay,
		MaxTimestamp: int64(date+1)*msecPerDay - 1,
	if date == 0 {
		// Search for metricIDs on the whole time range.
		tr.MaxTimestamp = timestampFromTime(time.Now())
	metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
	if err != nil {
		return nil, err
	return metricIDs, nil

func (is *indexSearch) searchMetricIDs(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) ([]uint64, error) {
	ok, err := is.containsTimeRange(tr)
	if err != nil {
		return nil, err
	if !ok {
		// Fast path - the index doesn't contain data for the given tr.
		return nil, nil
	metricIDs, err := is.searchMetricIDsInternal(qt, tfss, tr, maxMetrics)
	if err != nil {
		return nil, err
	if metricIDs.Len() == 0 {
		// Nothing found
		return nil, nil

	sortedMetricIDs := metricIDs.AppendTo(nil)
	qt.Printf("sort %d matching metric ids", len(sortedMetricIDs))

	// Filter out deleted metricIDs.
	dmis := is.db.s.getDeletedMetricIDs()
	if dmis.Len() > 0 {
		metricIDsFiltered := sortedMetricIDs[:0]
		for _, metricID := range sortedMetricIDs {
			if !dmis.Has(metricID) {
				metricIDsFiltered = append(metricIDsFiltered, metricID)
		qt.Printf("left %d metric ids after removing deleted metric ids", len(metricIDsFiltered))
		sortedMetricIDs = metricIDsFiltered

	return sortedMetricIDs, nil

func (is *indexSearch) searchMetricIDsInternal(qt *querytracer.Tracer, tfss []*TagFilters, tr TimeRange, maxMetrics int) (*uint64set.Set, error) {
	qt = qt.NewChild("search for metric ids: filters=%s, timeRange=%s, maxMetrics=%d", tfss, &tr, maxMetrics)
	defer qt.Done()
	metricIDs := &uint64set.Set{}
	for _, tfs := range tfss {
		if len(tfs.tfs) == 0 {
			// An empty filters must be equivalent to `{__name__!=""}`
			tfs = NewTagFilters()
			if err := tfs.Add(nil, nil, true, false); err != nil {
				logger.Panicf(`BUG: cannot add {__name__!=""} filter: %s`, err)
		qtChild := qt.NewChild("update metric ids: filters=%s, timeRange=%s", tfs, &tr)
		prevMetricIDsLen := metricIDs.Len()
		err := is.updateMetricIDsForTagFilters(qtChild, metricIDs, tfs, tr, maxMetrics+1)
		qtChild.Donef("updated %d metric ids", metricIDs.Len()-prevMetricIDsLen)
		if err != nil {
			return nil, err
		if metricIDs.Len() > maxMetrics {
			return nil, fmt.Errorf("the number of matching timeseries exceeds %d; either narrow down the search "+
				"or increase -search.max* command-line flag values at vmselect; see", maxMetrics)
	return metricIDs, nil

func (is *indexSearch) updateMetricIDsForTagFilters(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
	err := is.tryUpdatingMetricIDsForDateRange(qt, metricIDs, tfs, tr, maxMetrics)
	if err == nil {
		// Fast path: found metricIDs by date range.
		return nil
	if !errors.Is(err, errFallbackToGlobalSearch) {
		return err

	// Slow path - fall back to search in the global inverted index.
	qt.Printf("cannot find metric ids in per-day index; fall back to global index")
	atomic.AddUint64(&is.db.globalSearchCalls, 1)
	m, err := is.getMetricIDsForDateAndFilters(qt, 0, tfs, maxMetrics)
	if err != nil {
		if errors.Is(err, errFallbackToGlobalSearch) {
			return fmt.Errorf("the number of matching timeseries exceeds %d; either narrow down the search "+
				"or increase -search.max* command-line flag values at vmselect", maxMetrics)
		return err
	return nil

func (is *indexSearch) getMetricIDsForTagFilter(qt *querytracer.Tracer, tf *tagFilter, maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
	if tf.isNegative {
		logger.Panicf("BUG: isNegative must be false")
	metricIDs := &uint64set.Set{}
	if len(tf.orSuffixes) > 0 {
		// Fast path for orSuffixes - seek for rows for each value from orSuffixes.
		loopsCount, err := is.updateMetricIDsForOrSuffixes(tf, metricIDs, maxMetrics, maxLoopsCount)
		qt.Printf("found %d metric ids for filter={%s} using exact search; spent %d loops", metricIDs.Len(), tf, loopsCount)
		if err != nil {
			return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in fast path: %w; tagFilter=%s", err, tf)
		return metricIDs, loopsCount, nil

	// Slow path - scan for all the rows with the given prefix.
	loopsCount, err := is.getMetricIDsForTagFilterSlow(tf, metricIDs.Add, maxLoopsCount)
	qt.Printf("found %d metric ids for filter={%s} using prefix search; spent %d loops", metricIDs.Len(), tf, loopsCount)
	if err != nil {
		return nil, loopsCount, fmt.Errorf("error when searching for metricIDs for tagFilter in slow path: %w; tagFilter=%s", err, tf)
	return metricIDs, loopsCount, nil

var errTooManyLoops = fmt.Errorf("too many loops is needed for applying this filter")

func (is *indexSearch) getMetricIDsForTagFilterSlow(tf *tagFilter, f func(metricID uint64), maxLoopsCount int64) (int64, error) {
	if len(tf.orSuffixes) > 0 {
		logger.Panicf("BUG: the getMetricIDsForTagFilterSlow must be called only for empty tf.orSuffixes; got %s", tf.orSuffixes)

	// Scan all the rows with tf.prefix and call f on every tf match.
	ts := &is.ts
	kb := &is.kb
	mp := &
	var prevMatchingSuffix []byte
	var prevMatch bool
	var loopsCount int64
	loopsPaceLimiter := 0
	prefix := tf.prefix
	for ts.NextItem() {
		if loopsPaceLimiter&paceLimiterMediumIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return loopsCount, err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			return loopsCount, nil
		tail := item[len(prefix):]
		n := bytes.IndexByte(tail, tagSeparatorChar)
		if n < 0 {
			return loopsCount, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar=%d", item, tagSeparatorChar)
		suffix := tail[:n+1]
		tail = tail[n+1:]
		if err := mp.InitOnlyTail(item, tail); err != nil {
			return loopsCount, err
		loopsCount += int64(mp.MetricIDsLen())
		if loopsCount > maxLoopsCount {
			return loopsCount, errTooManyLoops
		if prevMatch && string(suffix) == string(prevMatchingSuffix) {
			// Fast path: the same tag value found.
			// There is no need in checking it again with potentially
			// slow tf.matchSuffix, which may call regexp.
			for _, metricID := range mp.MetricIDs {
		// Slow path: need tf.matchSuffix call.
		ok, err := tf.matchSuffix(suffix)
		// Assume that tf.matchSuffix call needs 10x more time than a single metric scan iteration.
		loopsCount += 10 * int64(tf.matchCost)
		if err != nil {
			return loopsCount, fmt.Errorf("error when matching %s against suffix %q: %w", tf, suffix, err)
		if !ok {
			prevMatch = false
			if mp.MetricIDsLen() < maxMetricIDsPerRow/2 {
				// If the current row contains non-full metricIDs list,
				// then it is likely the next row contains the next tag value.
				// So skip seeking for the next tag value, since it will be slower than just ts.NextItem call.
			// Optimization: skip all the metricIDs for the given tag value
			kb.B = append(kb.B[:0], item[:len(item)-len(tail)]...)
			// The last char in kb.B must be tagSeparatorChar. Just increment it
			// in order to jump to the next tag value.
			if len(kb.B) == 0 || kb.B[len(kb.B)-1] != tagSeparatorChar || tagSeparatorChar >= 0xff {
				return loopsCount, fmt.Errorf("data corruption: the last char in k=%X must be %X", kb.B, tagSeparatorChar)
			// Assume that a seek cost is equivalent to 1000 ordinary loops.
			loopsCount += 1000
		prevMatch = true
		prevMatchingSuffix = append(prevMatchingSuffix[:0], suffix...)
		for _, metricID := range mp.MetricIDs {
	if err := ts.Error(); err != nil {
		return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
	return loopsCount, nil

func (is *indexSearch) updateMetricIDsForOrSuffixes(tf *tagFilter, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
	if tf.isNegative {
		logger.Panicf("BUG: isNegative must be false")
	kb := kbPool.Get()
	defer kbPool.Put(kb)
	var loopsCount int64
	for _, orSuffix := range tf.orSuffixes {
		kb.B = append(kb.B[:0], tf.prefix...)
		kb.B = append(kb.B, orSuffix...)
		kb.B = append(kb.B, tagSeparatorChar)
		lc, err := is.updateMetricIDsForOrSuffix(kb.B, metricIDs, maxMetrics, maxLoopsCount-loopsCount)
		loopsCount += lc
		if err != nil {
			return loopsCount, err
		if metricIDs.Len() >= maxMetrics {
			return loopsCount, nil
	return loopsCount, nil

func (is *indexSearch) updateMetricIDsForOrSuffix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int, maxLoopsCount int64) (int64, error) {
	ts := &is.ts
	mp := &
	var loopsCount int64
	loopsPaceLimiter := 0
	for metricIDs.Len() < maxMetrics && ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return loopsCount, err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			return loopsCount, nil
		if err := mp.InitOnlyTail(item, item[len(prefix):]); err != nil {
			return loopsCount, err
		loopsCount += int64(mp.MetricIDsLen())
		if loopsCount > maxLoopsCount {
			return loopsCount, errTooManyLoops
	if err := ts.Error(); err != nil {
		return loopsCount, fmt.Errorf("error when searching for tag filter prefix %q: %w", prefix, err)
	return loopsCount, nil

var errFallbackToGlobalSearch = errors.New("fall back from per-day index search to global index search")

const maxDaysForPerDaySearch = 40

func (is *indexSearch) tryUpdatingMetricIDsForDateRange(qt *querytracer.Tracer, metricIDs *uint64set.Set, tfs *TagFilters, tr TimeRange, maxMetrics int) error {
	atomic.AddUint64(&is.db.dateRangeSearchCalls, 1)
	minDate := uint64(tr.MinTimestamp) / msecPerDay
	maxDate := uint64(tr.MaxTimestamp-1) / msecPerDay
	if minDate > maxDate || maxDate-minDate > maxDaysForPerDaySearch {
		// Too much dates must be covered. Give up, since it may be slow.
		return errFallbackToGlobalSearch
	if minDate == maxDate {
		// Fast path - query only a single date.
		m, err := is.getMetricIDsForDateAndFilters(qt, minDate, tfs, maxMetrics)
		if err != nil {
			return err
		atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
		return nil

	// Slower path - search for metricIDs for each day in parallel.
	qt = qt.NewChild("parallel search for metric ids in per-day index: filters=%s, dayRange=[%d..%d]", tfs, minDate, maxDate)
	defer qt.Done()
	wg := getWaitGroup()
	var errGlobal error
	var mu sync.Mutex // protects metricIDs + errGlobal vars from concurrent access below
	for minDate <= maxDate {
		qtChild := qt.NewChild("parallel thread for date=%s", dateToString(minDate))
		go func(date uint64) {
			defer func() {
			isLocal := is.db.getIndexSearch(is.deadline)
			m, err := isLocal.getMetricIDsForDateAndFilters(qtChild, date, tfs, maxMetrics)
			defer mu.Unlock()
			if errGlobal != nil {
			if err != nil {
				dateStr := time.Unix(int64(date*24*3600), 0)
				errGlobal = fmt.Errorf("cannot search for metricIDs at %s: %w", dateStr, err)
			if metricIDs.Len() < maxMetrics {
	if errGlobal != nil {
		return errGlobal
	atomic.AddUint64(&is.db.dateRangeSearchHits, 1)
	return nil

func (is *indexSearch) getMetricIDsForDateAndFilters(qt *querytracer.Tracer, date uint64, tfs *TagFilters, maxMetrics int) (*uint64set.Set, error) {
	if qt.Enabled() {
		qt = qt.NewChild("search for metric ids on a particular day: filters=%s, date=%s, maxMetrics=%d", tfs, dateToString(date), maxMetrics)
		defer qt.Done()
	// Sort tfs by loopsCount needed for performing each filter.
	// This stats is usually collected from the previous queries.
	// This way we limit the amount of work below by applying fast filters at first.
	type tagFilterWithWeight struct {
		tf               *tagFilter
		loopsCount       int64
		filterLoopsCount int64
	tfws := make([]tagFilterWithWeight, len(tfs.tfs))
	currentTime := fasttime.UnixTimestamp()
	for i := range tfs.tfs {
		tf := &tfs.tfs[i]
		loopsCount, filterLoopsCount, timestamp := is.getLoopsCountAndTimestampForDateFilter(date, tf)
		if currentTime > timestamp+3600 {
			// Update stats once per hour for relatively fast tag filters.
			// There is no need in spending CPU resources on updating stats for heavy tag filters.
			if loopsCount <= 10e6 {
				loopsCount = 0
			if filterLoopsCount <= 10e6 {
				filterLoopsCount = 0
		tfws[i] = tagFilterWithWeight{
			tf:               tf,
			loopsCount:       loopsCount,
			filterLoopsCount: filterLoopsCount,
	sort.Slice(tfws, func(i, j int) bool {
		a, b := &tfws[i], &tfws[j]
		if a.loopsCount != b.loopsCount {
			return a.loopsCount < b.loopsCount
	getFirstPositiveLoopsCount := func(tfws []tagFilterWithWeight) int64 {
		for i := range tfws {
			if n := tfws[i].loopsCount; n > 0 {
				return n
		return int64Max
	storeLoopsCount := func(tfw *tagFilterWithWeight, loopsCount int64) {
		if loopsCount != tfw.loopsCount {
			tfw.loopsCount = loopsCount
			is.storeLoopsCountForDateFilter(date,, tfw.loopsCount, tfw.filterLoopsCount)

	// Populate metricIDs for the first non-negative filter with the smallest cost.
	qtChild := qt.NewChild("search for the first non-negative filter with the smallest cost")
	var metricIDs *uint64set.Set
	tfwsRemaining := tfws[:0]
	maxDateMetrics := intMax
	if maxMetrics < intMax/50 {
		maxDateMetrics = maxMetrics * 50
	for i, tfw := range tfws {
		tf :=
		if tf.isNegative || tf.isEmptyMatch {
			tfwsRemaining = append(tfwsRemaining, tfw)
		maxLoopsCount := getFirstPositiveLoopsCount(tfws[i+1:])
		m, loopsCount, err := is.getMetricIDsForDateTagFilter(qtChild, tf, date, tfs.commonPrefix, maxDateMetrics, maxLoopsCount)
		if err != nil {
			if errors.Is(err, errTooManyLoops) {
				// The tf took too many loops compared to the next filter. Postpone applying this filter.
				qtChild.Printf("the filter={%s} took more than %d loops; postpone it", tf, maxLoopsCount)
				storeLoopsCount(&tfw, 2*loopsCount)
				tfwsRemaining = append(tfwsRemaining, tfw)
			// Move failing filter to the end of filter list.
			storeLoopsCount(&tfw, int64Max)
			return nil, err
		if m.Len() >= maxDateMetrics {
			// Too many time series found by a single tag filter. Move the filter to the end of list.
			qtChild.Printf("the filter={%s} matches at least %d series; postpone it", tf, maxDateMetrics)
			storeLoopsCount(&tfw, int64Max-1)
			tfwsRemaining = append(tfwsRemaining, tfw)
		storeLoopsCount(&tfw, loopsCount)
		metricIDs = m
		tfwsRemaining = append(tfwsRemaining, tfws[i+1:]...)
		qtChild.Printf("the filter={%s} matches less than %d series (actually %d series); use it", tf, maxDateMetrics, metricIDs.Len())
	tfws = tfwsRemaining

	if metricIDs == nil {
		// All the filters in tfs are negative or match too many time series.
		// Populate all the metricIDs for the given (date),
		// so later they can be filtered out with negative filters.
		qt.Printf("all the filters are negative or match more than %d time series; fall back to searching for all the metric ids", maxDateMetrics)
		m, err := is.getMetricIDsForDate(date, maxDateMetrics)
		if err != nil {
			return nil, fmt.Errorf("cannot obtain all the metricIDs: %w", err)
		if m.Len() >= maxDateMetrics {
			// Too many time series found for the given (date). Fall back to global search.
			return nil, errFallbackToGlobalSearch
		metricIDs = m
		qt.Printf("found %d metric ids", metricIDs.Len())

	sort.Slice(tfws, func(i, j int) bool {
		a, b := &tfws[i], &tfws[j]
		if a.filterLoopsCount != b.filterLoopsCount {
			return a.filterLoopsCount < b.filterLoopsCount
	getFirstPositiveFilterLoopsCount := func(tfws []tagFilterWithWeight) int64 {
		for i := range tfws {
			if n := tfws[i].filterLoopsCount; n > 0 {
				return n
		return int64Max
	storeFilterLoopsCount := func(tfw *tagFilterWithWeight, filterLoopsCount int64) {
		if filterLoopsCount != tfw.filterLoopsCount {
			is.storeLoopsCountForDateFilter(date,, tfw.loopsCount, filterLoopsCount)

	// Intersect metricIDs with the rest of filters.
	// Do not run these tag filters in parallel, since this may result in CPU and RAM waste
	// when the initial tag filters significantly reduce the number of found metricIDs,
	// so the remaining filters could be performed via much faster metricName matching instead
	// of slow selecting of matching metricIDs.
	qtChild = qt.NewChild("intersect the remaining %d filters with the found %d metric ids", len(tfws), metricIDs.Len())
	var tfsPostponed []*tagFilter
	for i, tfw := range tfws {
		tf :=
		metricIDsLen := metricIDs.Len()
		if metricIDsLen == 0 {
			// There is no need in applying the remaining filters to an empty set.
		if tfw.filterLoopsCount > int64(metricIDsLen)*loopsCountPerMetricNameMatch {
			// It should be faster performing metricName match on the remaining filters
			// instead of scanning big number of entries in the inverted index for these filters.
			for _, tfw := range tfws[i:] {
				tfsPostponed = append(tfsPostponed,
		maxLoopsCount := getFirstPositiveFilterLoopsCount(tfws[i+1:])
		if maxLoopsCount == int64Max {
			maxLoopsCount = int64(metricIDsLen) * loopsCountPerMetricNameMatch
		m, filterLoopsCount, err := is.getMetricIDsForDateTagFilter(qtChild, tf, date, tfs.commonPrefix, intMax, maxLoopsCount)
		if err != nil {
			if errors.Is(err, errTooManyLoops) {
				// Postpone tf, since it took more loops than the next filter may need.
				qtChild.Printf("postpone filter={%s}, since it took more than %d loops", tf, maxLoopsCount)
				storeFilterLoopsCount(&tfw, 2*filterLoopsCount)
				tfsPostponed = append(tfsPostponed, tf)
			// Move failing tf to the end of filter list
			storeFilterLoopsCount(&tfw, int64Max)
			return nil, err
		storeFilterLoopsCount(&tfw, filterLoopsCount)
		if tf.isNegative || tf.isEmptyMatch {
			qtChild.Printf("subtract %d metric ids from the found %d metric ids for filter={%s}; resulting metric ids: %d", m.Len(), metricIDsLen, tf, metricIDs.Len())
		} else {
			qtChild.Printf("intersect %d metric ids with the found %d metric ids for filter={%s}; resulting metric ids: %d", m.Len(), metricIDsLen, tf, metricIDs.Len())
	if metricIDs.Len() == 0 {
		// There is no need in applying tfsPostponed, since the result is empty.
		qt.Printf("found zero metric ids")
		return nil, nil
	if len(tfsPostponed) > 0 {
		// Apply the postponed filters via metricName match.
		qt.Printf("apply postponed filters=%s to %d metrics ids", tfsPostponed, metricIDs.Len())
		var m uint64set.Set
		if err := is.updateMetricIDsByMetricNameMatch(qt, &m, metricIDs, tfsPostponed); err != nil {
			return nil, err
		return &m, nil
	qt.Printf("found %d metric ids", metricIDs.Len())
	return metricIDs, nil

const (
	intMax   = int((^uint(0)) >> 1)
	int64Max = int64((1 << 63) - 1)

func (is *indexSearch) createPerDayIndexes(date, metricID uint64, mn *MetricName) {
	ii := getIndexItems()
	defer putIndexItems(ii)

	ii.B = marshalCommonPrefix(ii.B, nsPrefixDateToMetricID)
	ii.B = encoding.MarshalUint64(ii.B, date)
	ii.B = encoding.MarshalUint64(ii.B, metricID)

	// Create per-day inverted index entries for metricID.
	kb := kbPool.Get()
	defer kbPool.Put(kb)
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateTagToMetricIDs)
	kb.B = encoding.MarshalUint64(kb.B, date)
	ii.registerTagIndexes(kb.B, mn, metricID)
	is.db.s.dateMetricIDCache.Set(date, metricID)

func (ii *indexItems) registerTagIndexes(prefix []byte, mn *MetricName, metricID uint64) {
	// Add index entry for MetricGroup -> MetricID
	ii.B = append(ii.B, prefix...)
	ii.B = marshalTagValue(ii.B, nil)
	ii.B = marshalTagValue(ii.B, mn.MetricGroup)
	ii.B = encoding.MarshalUint64(ii.B, metricID)
	ii.addReverseMetricGroupIfNeeded(prefix, mn, metricID)

	// Add index entries for tags: tag -> MetricID
	for _, tag := range mn.Tags {
		ii.B = append(ii.B, prefix...)
		ii.B = tag.Marshal(ii.B)
		ii.B = encoding.MarshalUint64(ii.B, metricID)

	// Add index entries for composite tags: MetricGroup+tag -> MetricID
	compositeKey := kbPool.Get()
	for _, tag := range mn.Tags {
		compositeKey.B = marshalCompositeTagKey(compositeKey.B[:0], mn.MetricGroup, tag.Key)
		ii.B = append(ii.B, prefix...)
		ii.B = marshalTagValue(ii.B, compositeKey.B)
		ii.B = marshalTagValue(ii.B, tag.Value)
		ii.B = encoding.MarshalUint64(ii.B, metricID)

func (ii *indexItems) addReverseMetricGroupIfNeeded(prefix []byte, mn *MetricName, metricID uint64) {
	if bytes.IndexByte(mn.MetricGroup, '.') < 0 {
		// The reverse metric group is needed only for Graphite-like metrics with points.
	// This is most likely a Graphite metric like ''.
	// Store reverse metric name 'zab.rab.oof' in order to speed up search for '*.bar.baz'
	// when the Graphite wildcard has a suffix matching small number of time series.
	ii.B = append(ii.B, prefix...)
	ii.B = marshalTagValue(ii.B, graphiteReverseTagKey)
	revBuf := kbPool.Get()
	revBuf.B = reverseBytes(revBuf.B[:0], mn.MetricGroup)
	ii.B = marshalTagValue(ii.B, revBuf.B)
	ii.B = encoding.MarshalUint64(ii.B, metricID)

func isArtificialTagKey(key []byte) bool {
	if bytes.Equal(key, graphiteReverseTagKey) {
		return true
	if len(key) > 0 && key[0] == compositeTagKeyPrefix {
		return true
	return false

// The tag key for reverse metric name used for speeding up searching
// for Graphite wildcards with suffix matching small number of time series,
// i.e. '*.bar.baz'.
// It is expected that the given key isn't be used by users.
var graphiteReverseTagKey = []byte("\xff")

// The prefix for composite tag, which is used for speeding up searching
// for composite filters, which contain `{__name__="<metric_name>"}` filter.
// It is expected that the given prefix isn't used by users.
const compositeTagKeyPrefix = '\xfe'

func marshalCompositeTagKey(dst, name, key []byte) []byte {
	dst = append(dst, compositeTagKeyPrefix)
	dst = encoding.MarshalVarUint64(dst, uint64(len(name)))
	dst = append(dst, name...)
	dst = append(dst, key...)
	return dst

func unmarshalCompositeTagKey(src []byte) ([]byte, []byte, error) {
	if len(src) == 0 {
		return nil, nil, fmt.Errorf("composite tag key cannot be empty")
	if src[0] != compositeTagKeyPrefix {
		return nil, nil, fmt.Errorf("missing composite tag key prefix in %q", src)
	src = src[1:]
	tail, n, err := encoding.UnmarshalVarUint64(src)
	if err != nil {
		return nil, nil, fmt.Errorf("cannot unmarshal metric name length from composite tag key: %w", err)
	src = tail
	if uint64(len(src)) < n {
		return nil, nil, fmt.Errorf("missing metric name with length %d in composite tag key %q", n, src)
	name := src[:n]
	key := src[n:]
	return name, key, nil

func reverseBytes(dst, src []byte) []byte {
	for i := len(src) - 1; i >= 0; i-- {
		dst = append(dst, src[i])
	return dst

func (is *indexSearch) hasDateMetricID(date, metricID uint64) (bool, error) {
	ts := &is.ts
	kb := &is.kb
	kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixDateToMetricID)
	kb.B = encoding.MarshalUint64(kb.B, date)
	kb.B = encoding.MarshalUint64(kb.B, metricID)
	if err := ts.FirstItemWithPrefix(kb.B); err != nil {
		if err == io.EOF {
			return false, nil
		return false, fmt.Errorf("error when searching for (date=%s, metricID=%d) entry: %w", dateToString(date), metricID, err)
	if string(ts.Item) != string(kb.B) {
		return false, fmt.Errorf("unexpected entry for (date=%s, metricID=%d); got %q; want %q", dateToString(date), metricID, ts.Item, kb.B)
	return true, nil

func (is *indexSearch) getMetricIDsForDateTagFilter(qt *querytracer.Tracer, tf *tagFilter, date uint64, commonPrefix []byte,
	maxMetrics int, maxLoopsCount int64) (*uint64set.Set, int64, error) {
	if qt.Enabled() {
		qt = qt.NewChild("get metric ids for filter and date: filter={%s}, date=%s, maxMetrics=%d, maxLoopsCount=%d", tf, dateToString(date), maxMetrics, maxLoopsCount)
		defer qt.Done()
	if !bytes.HasPrefix(tf.prefix, commonPrefix) {
		logger.Panicf("BUG: unexpected tf.prefix %q; must start with commonPrefix %q", tf.prefix, commonPrefix)
	kb := kbPool.Get()
	defer kbPool.Put(kb)
	kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
	prefix := kb.B
	kb.B = append(kb.B, tf.prefix[len(commonPrefix):]...)
	tfNew := *tf
	tfNew.isNegative = false // isNegative for the original tf is handled by the caller.
	tfNew.prefix = kb.B
	metricIDs, loopsCount, err := is.getMetricIDsForTagFilter(qt, &tfNew, maxMetrics, maxLoopsCount)
	if err != nil {
		return nil, loopsCount, err
	if tf.isNegative || !tf.isEmptyMatch {
		return metricIDs, loopsCount, nil
	// The tag filter, which matches empty label such as {foo=~"bar|"}
	// Convert it to negative filter, which matches {foo=~".+",foo!~"bar|"}.
	// This fixes
	// See also
	maxLoopsCount -= loopsCount
	var tfGross tagFilter
	if err := tfGross.Init(prefix, tf.key, []byte(".+"), false, true); err != nil {
		logger.Panicf(`BUG: cannot init tag filter: {%q=~".+"}: %s`, tf.key, err)
	m, lc, err := is.getMetricIDsForTagFilter(qt, &tfGross, maxMetrics, maxLoopsCount)
	loopsCount += lc
	if err != nil {
		return nil, loopsCount, err
	mLen := m.Len()
	qt.Printf("subtract %d metric ids for filter={%s} from %d metric ids for filter={%s}", metricIDs.Len(), &tfNew, mLen, &tfGross)
	qt.Printf("found %d metric ids, spent %d loops", m.Len(), loopsCount)
	return m, loopsCount, nil

func (is *indexSearch) getLoopsCountAndTimestampForDateFilter(date uint64, tf *tagFilter) (int64, int64, uint64) {
	is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0],, date, tf)
	kb := kbPool.Get()
	defer kbPool.Put(kb)
	kb.B = is.db.loopsPerDateTagFilterCache.Get(kb.B[:0], is.kb.B)
	if len(kb.B) != 3*8 {
		return 0, 0, 0
	loopsCount := encoding.UnmarshalInt64(kb.B)
	filterLoopsCount := encoding.UnmarshalInt64(kb.B[8:])
	timestamp := encoding.UnmarshalUint64(kb.B[16:])
	return loopsCount, filterLoopsCount, timestamp

func (is *indexSearch) storeLoopsCountForDateFilter(date uint64, tf *tagFilter, loopsCount, filterLoopsCount int64) {
	currentTimestamp := fasttime.UnixTimestamp()
	is.kb.B = appendDateTagFilterCacheKey(is.kb.B[:0],, date, tf)
	kb := kbPool.Get()
	kb.B = encoding.MarshalInt64(kb.B[:0], loopsCount)
	kb.B = encoding.MarshalInt64(kb.B, filterLoopsCount)
	kb.B = encoding.MarshalUint64(kb.B, currentTimestamp)
	is.db.loopsPerDateTagFilterCache.Set(is.kb.B, kb.B)

func appendDateTagFilterCacheKey(dst []byte, indexDBName string, date uint64, tf *tagFilter) []byte {
	dst = append(dst, indexDBName...)
	dst = encoding.MarshalUint64(dst, date)
	dst = tf.Marshal(dst)
	return dst

func (is *indexSearch) getMetricIDsForDate(date uint64, maxMetrics int) (*uint64set.Set, error) {
	// Extract all the metricIDs from (date, __name__=value)->metricIDs entries.
	kb := kbPool.Get()
	defer kbPool.Put(kb)
	kb.B = is.marshalCommonPrefixForDate(kb.B[:0], date)
	kb.B = marshalTagValue(kb.B, nil)
	var metricIDs uint64set.Set
	if err := is.updateMetricIDsForPrefix(kb.B, &metricIDs, maxMetrics); err != nil {
		return nil, err
	return &metricIDs, nil

func (is *indexSearch) updateMetricIDsForPrefix(prefix []byte, metricIDs *uint64set.Set, maxMetrics int) error {
	ts := &is.ts
	mp := &
	loopsPaceLimiter := 0
	for ts.NextItem() {
		if loopsPaceLimiter&paceLimiterFastIterationsMask == 0 {
			if err := checkSearchDeadlineAndPace(is.deadline); err != nil {
				return err
		item := ts.Item
		if !bytes.HasPrefix(item, prefix) {
			return nil
		tail := item[len(prefix):]
		n := bytes.IndexByte(tail, tagSeparatorChar)
		if n < 0 {
			return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
		tail = tail[n+1:]
		if err := mp.InitOnlyTail(item, tail); err != nil {
			return err
		if metricIDs.Len() >= maxMetrics {
			return nil
	if err := ts.Error(); err != nil {
		return fmt.Errorf("error when searching for all metricIDs by prefix %q: %w", prefix, err)
	return nil

// The estimated number of index scan loops a single loop in updateMetricIDsByMetricNameMatch takes.
const loopsCountPerMetricNameMatch = 150

var kbPool bytesutil.ByteBufferPool

// Returns local unique MetricID.
func generateUniqueMetricID() uint64 {
	// It is expected that metricIDs returned from this function must be dense.
	// If they will be sparse, then this may hurt metric_ids intersection
	// performance with uint64set.Set.
	return atomic.AddUint64(&nextUniqueMetricID, 1)

// This number mustn't go backwards on restarts, otherwise metricID
// collisions are possible. So don't change time on the server
// between VictoriaMetrics restarts.
var nextUniqueMetricID = uint64(time.Now().UnixNano())

func marshalCommonPrefix(dst []byte, nsPrefix byte) []byte {
	dst = append(dst, nsPrefix)
	return dst

// This function is needed only for minimizing the difference between code for single-node and cluster version.
func (is *indexSearch) marshalCommonPrefix(dst []byte, nsPrefix byte) []byte {
	return marshalCommonPrefix(dst, nsPrefix)

func (is *indexSearch) marshalCommonPrefixForDate(dst []byte, date uint64) []byte {
	if date == 0 {
		// Global index
		return is.marshalCommonPrefix(dst, nsPrefixTagToMetricIDs)
	// Per-day index
	dst = is.marshalCommonPrefix(dst, nsPrefixDateTagToMetricIDs)
	return encoding.MarshalUint64(dst, date)

func unmarshalCommonPrefix(src []byte) ([]byte, byte, error) {
	if len(src) < commonPrefixLen {
		return nil, 0, fmt.Errorf("cannot unmarshal common prefix from %d bytes; need at least %d bytes; data=%X", len(src), commonPrefixLen, src)
	prefix := src[0]
	return src[commonPrefixLen:], prefix, nil

// 1 byte for prefix
const commonPrefixLen = 1

type tagToMetricIDsRowParser struct {
	// NSPrefix contains the first byte parsed from the row after Init call.
	// This is either nsPrefixTagToMetricIDs or nsPrefixDateTagToMetricIDs.
	NSPrefix byte

	// Date contains parsed date for nsPrefixDateTagToMetricIDs rows after Init call
	Date uint64

	// MetricIDs contains parsed MetricIDs after ParseMetricIDs call
	MetricIDs []uint64

	// metricIDsParsed is set to true after ParseMetricIDs call
	metricIDsParsed bool

	// Tag contains parsed tag after Init call
	Tag Tag

	// tail contains the remaining unparsed metricIDs
	tail []byte

func (mp *tagToMetricIDsRowParser) Reset() {
	mp.NSPrefix = 0
	mp.Date = 0
	mp.MetricIDs = mp.MetricIDs[:0]
	mp.metricIDsParsed = false
	mp.tail = nil

// Init initializes mp from b, which should contain encoded tag->metricIDs row.
// b cannot be re-used until Reset call.
func (mp *tagToMetricIDsRowParser) Init(b []byte, nsPrefixExpected byte) error {
	tail, nsPrefix, err := unmarshalCommonPrefix(b)
	if err != nil {
		return fmt.Errorf("invalid tag->metricIDs row %q: %w", b, err)
	if nsPrefix != nsPrefixExpected {
		return fmt.Errorf("invalid prefix for tag->metricIDs row %q; got %d; want %d", b, nsPrefix, nsPrefixExpected)
	if nsPrefix == nsPrefixDateTagToMetricIDs {
		// unmarshal date.
		if len(tail) < 8 {
			return fmt.Errorf("cannot unmarshal date from (date, tag)->metricIDs row %q from %d bytes; want at least 8 bytes", b, len(tail))
		mp.Date = encoding.UnmarshalUint64(tail)
		tail = tail[8:]
	mp.NSPrefix = nsPrefix
	tail, err = mp.Tag.Unmarshal(tail)
	if err != nil {
		return fmt.Errorf("cannot unmarshal tag from tag->metricIDs row %q: %w", b, err)
	return mp.InitOnlyTail(b, tail)

// MarshalPrefix marshals row prefix without tail to dst.
func (mp *tagToMetricIDsRowParser) MarshalPrefix(dst []byte) []byte {
	dst = marshalCommonPrefix(dst, mp.NSPrefix)
	if mp.NSPrefix == nsPrefixDateTagToMetricIDs {
		dst = encoding.MarshalUint64(dst, mp.Date)
	dst = mp.Tag.Marshal(dst)
	return dst

// InitOnlyTail initializes mp.tail from tail.
// b must contain tag->metricIDs row.
// b cannot be re-used until Reset call.
func (mp *tagToMetricIDsRowParser) InitOnlyTail(b, tail []byte) error {
	if len(tail) == 0 {
		return fmt.Errorf("missing metricID in the tag->metricIDs row %q", b)
	if len(tail)%8 != 0 {
		return fmt.Errorf("invalid tail length in the tag->metricIDs row; got %d bytes; must be multiple of 8 bytes", len(tail))
	mp.tail = tail
	mp.metricIDsParsed = false
	return nil

// EqualPrefix returns true if prefixes for mp and x are equal.
// Prefix contains (tag)
func (mp *tagToMetricIDsRowParser) EqualPrefix(x *tagToMetricIDsRowParser) bool {
	if !mp.Tag.Equal(&x.Tag) {
		return false
	return mp.Date == x.Date && mp.NSPrefix == x.NSPrefix

// MetricIDsLen returns the number of MetricIDs in the mp.tail
func (mp *tagToMetricIDsRowParser) MetricIDsLen() int {
	return len(mp.tail) / 8

// ParseMetricIDs parses MetricIDs from mp.tail into mp.MetricIDs.
func (mp *tagToMetricIDsRowParser) ParseMetricIDs() {
	if mp.metricIDsParsed {
	tail := mp.tail
	mp.MetricIDs = mp.MetricIDs[:0]
	n := len(tail) / 8
	if n <= cap(mp.MetricIDs) {
		mp.MetricIDs = mp.MetricIDs[:n]
	} else {
		mp.MetricIDs = append(mp.MetricIDs[:cap(mp.MetricIDs)], make([]uint64, n-cap(mp.MetricIDs))...)
	metricIDs := mp.MetricIDs
	_ = metricIDs[n-1]
	for i := 0; i < n; i++ {
		if len(tail) < 8 {
			logger.Panicf("BUG: tail cannot be smaller than 8 bytes; got %d bytes; tail=%X", len(tail), tail)
		metricID := encoding.UnmarshalUint64(tail)
		metricIDs[i] = metricID
		tail = tail[8:]
	mp.metricIDsParsed = true

// GetMatchingSeriesCount returns the number of series in mp, which match metricIDs from the given filter
// and do not match metricIDs from negativeFilter.
// if filter is empty, then all series in mp are taken into account.
func (mp *tagToMetricIDsRowParser) GetMatchingSeriesCount(filter, negativeFilter *uint64set.Set) int {
	if filter == nil && negativeFilter.Len() == 0 {
		return mp.MetricIDsLen()
	n := 0
	for _, metricID := range mp.MetricIDs {
		if filter != nil && !filter.Has(metricID) {
		if !negativeFilter.Has(metricID) {
	return n

func mergeTagToMetricIDsRows(data []byte, items []mergeset.Item) ([]byte, []mergeset.Item) {
	data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixTagToMetricIDs)
	data, items = mergeTagToMetricIDsRowsInternal(data, items, nsPrefixDateTagToMetricIDs)
	return data, items

func mergeTagToMetricIDsRowsInternal(data []byte, items []mergeset.Item, nsPrefix byte) ([]byte, []mergeset.Item) {
	// Perform quick checks whether items contain rows starting from nsPrefix
	// based on the fact that items are sorted.
	if len(items) <= 2 {
		// The first and the last row must remain unchanged.
		return data, items
	firstItem := items[0].Bytes(data)
	if len(firstItem) > 0 && firstItem[0] > nsPrefix {
		return data, items
	lastItem := items[len(items)-1].Bytes(data)
	if len(lastItem) > 0 && lastItem[0] < nsPrefix {
		return data, items

	// items contain at least one row starting from nsPrefix. Merge rows with common tag.
	tmm := getTagToMetricIDsRowsMerger()
	tmm.dataCopy = append(tmm.dataCopy[:0], data...)
	tmm.itemsCopy = append(tmm.itemsCopy[:0], items...)
	mp := &
	mpPrev := &tmm.mpPrev
	dstData := data[:0]
	dstItems := items[:0]
	for i, it := range items {
		item := it.Bytes(data)
		if len(item) == 0 || item[0] != nsPrefix || i == 0 || i == len(items)-1 {
			// Write rows not starting with nsPrefix as-is.
			// Additionally write the first and the last row as-is in order to preserve
			// sort order for adjacent blocks.
			dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
			dstData = append(dstData, item...)
			dstItems = append(dstItems, mergeset.Item{
				Start: uint32(len(dstData) - len(item)),
				End:   uint32(len(dstData)),
		if err := mp.Init(item, nsPrefix); err != nil {
			logger.Panicf("FATAL: cannot parse row starting with nsPrefix %d during merge: %s", nsPrefix, err)
		if mp.MetricIDsLen() >= maxMetricIDsPerRow {
			dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
			dstData = append(dstData, item...)
			dstItems = append(dstItems, mergeset.Item{
				Start: uint32(len(dstData) - len(item)),
				End:   uint32(len(dstData)),
		if !mp.EqualPrefix(mpPrev) {
			dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
		tmm.pendingMetricIDs = append(tmm.pendingMetricIDs, mp.MetricIDs...)
		mpPrev, mp = mp, mpPrev
		if len(tmm.pendingMetricIDs) >= maxMetricIDsPerRow {
			dstData, dstItems = tmm.flushPendingMetricIDs(dstData, dstItems, mpPrev)
	if len(tmm.pendingMetricIDs) > 0 {
		logger.Panicf("BUG: tmm.pendingMetricIDs must be empty at this point; got %d items: %d", len(tmm.pendingMetricIDs), tmm.pendingMetricIDs)
	if !checkItemsSorted(dstData, dstItems) {
		// Items could become unsorted if initial items contain duplicate metricIDs:
		//   item1: 1, 1, 5
		//   item2: 1, 4
		// Items could become the following after the merge:
		//   item1: 1, 5
		//   item2: 1, 4
		// i.e. item1 > item2
		// Leave the original items unmerged, so they can be merged next time.
		// This case should be quite rare - if multiple data points are simultaneously inserted
		// into the same new time series from multiple concurrent goroutines.
		atomic.AddUint64(&indexBlocksWithMetricIDsIncorrectOrder, 1)
		dstData = append(dstData[:0], tmm.dataCopy...)
		dstItems = append(dstItems[:0], tmm.itemsCopy...)
		if !checkItemsSorted(dstData, dstItems) {
			logger.Panicf("BUG: the original items weren't sorted; items=%q", dstItems)
	atomic.AddUint64(&indexBlocksWithMetricIDsProcessed, 1)
	return dstData, dstItems

var indexBlocksWithMetricIDsIncorrectOrder uint64
var indexBlocksWithMetricIDsProcessed uint64

func checkItemsSorted(data []byte, items []mergeset.Item) bool {
	if len(items) == 0 {
		return true
	prevItem := items[0].String(data)
	for _, it := range items[1:] {
		currItem := it.String(data)
		if prevItem > currItem {
			return false
		prevItem = currItem
	return true

// maxMetricIDsPerRow limits the number of metricIDs in tag->metricIDs row.
// This reduces overhead on index and metaindex in lib/mergeset.
const maxMetricIDsPerRow = 64

type uint64Sorter []uint64

func (s uint64Sorter) Len() int { return len(s) }
func (s uint64Sorter) Less(i, j int) bool {
	return s[i] < s[j]
func (s uint64Sorter) Swap(i, j int) {
	s[i], s[j] = s[j], s[i]

type tagToMetricIDsRowsMerger struct {
	pendingMetricIDs uint64Sorter
	mp               tagToMetricIDsRowParser
	mpPrev           tagToMetricIDsRowParser

	itemsCopy []mergeset.Item
	dataCopy  []byte

func (tmm *tagToMetricIDsRowsMerger) Reset() {
	tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]

	tmm.itemsCopy = tmm.itemsCopy[:0]
	tmm.dataCopy = tmm.dataCopy[:0]

func (tmm *tagToMetricIDsRowsMerger) flushPendingMetricIDs(dstData []byte, dstItems []mergeset.Item, mp *tagToMetricIDsRowParser) ([]byte, []mergeset.Item) {
	if len(tmm.pendingMetricIDs) == 0 {
		// Nothing to flush
		return dstData, dstItems
	// Use sort.Sort instead of sort.Slice in order to reduce memory allocations.
	tmm.pendingMetricIDs = removeDuplicateMetricIDs(tmm.pendingMetricIDs)

	// Marshal pendingMetricIDs
	dstDataLen := len(dstData)
	dstData = mp.MarshalPrefix(dstData)
	for _, metricID := range tmm.pendingMetricIDs {
		dstData = encoding.MarshalUint64(dstData, metricID)
	dstItems = append(dstItems, mergeset.Item{
		Start: uint32(dstDataLen),
		End:   uint32(len(dstData)),
	tmm.pendingMetricIDs = tmm.pendingMetricIDs[:0]
	return dstData, dstItems

func removeDuplicateMetricIDs(sortedMetricIDs []uint64) []uint64 {
	if len(sortedMetricIDs) < 2 {
		return sortedMetricIDs
	prevMetricID := sortedMetricIDs[0]
	hasDuplicates := false
	for _, metricID := range sortedMetricIDs[1:] {
		if prevMetricID == metricID {
			hasDuplicates = true
		prevMetricID = metricID
	if !hasDuplicates {
		return sortedMetricIDs
	dstMetricIDs := sortedMetricIDs[:1]
	prevMetricID = sortedMetricIDs[0]
	for _, metricID := range sortedMetricIDs[1:] {
		if prevMetricID == metricID {
		dstMetricIDs = append(dstMetricIDs, metricID)
		prevMetricID = metricID
	return dstMetricIDs

func getTagToMetricIDsRowsMerger() *tagToMetricIDsRowsMerger {
	v := tmmPool.Get()
	if v == nil {
		return &tagToMetricIDsRowsMerger{}
	return v.(*tagToMetricIDsRowsMerger)

func putTagToMetricIDsRowsMerger(tmm *tagToMetricIDsRowsMerger) {

var tmmPool sync.Pool