VictoriaMetrics/app/vmselect/netstorage/tenant_cache.go
Zakhar Bessarab 44b071296d
vmselect: add support of multi-tenant queries (#6346)
### Describe Your Changes

Added an ability to query data across multiple tenants. See:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1434

Currently, the following endpoints work with multi-tenancy:
- /prometheus/api/v1/query
- /prometheus/api/v1/query_range
- /prometheus/api/v1/series
- /prometheus/api/v1/labels
- /prometheus/api/v1/label/<label_name>/values
- /prometheus/api/v1/status/active_queries
- /prometheus/api/v1/status/top_queries
- /prometheus/api/v1/status/tsdb
- /prometheus/api/v1/export
- /prometheus/api/v1/export/csv
- /vmui


A note regarding VMUI: endpoints such as `active_queries` and
`top_queries` have been updated to indicate whether query was a
single-tenant or multi-tenant, but UI needs to be updated to display
this info.
cc: @Loori-R 

---------

Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: f41gh7 <nik@victoriametrics.com>
2024-10-01 16:37:18 +02:00

189 lines
4.6 KiB
Go

package netstorage
import (
"flag"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
var (
tenantsCacheDuration = flag.Duration("search.tenantCacheExpireDuration", 5*time.Minute, "The expiry duration for list of tenants for multi-tenant queries.")
)
// TenantsCached returns the list of tenants available in the storage.
func TenantsCached(qt *querytracer.Tracer, tr storage.TimeRange, deadline searchutils.Deadline) ([]storage.TenantToken, error) {
qt.Printf("fetching tenants on timeRange=%s", tr.String())
cached := tenantsCacheV.get(tr)
qt.Printf("fetched %d tenants from cache", len(cached))
if len(cached) > 0 {
return cached, nil
}
tenants, err := Tenants(qt, tr, deadline)
if err != nil {
return nil, fmt.Errorf("cannot obtain tenants: %w", err)
}
qt.Printf("fetched %d tenants from storage", len(tenants))
tt := make([]storage.TenantToken, len(tenants))
for i, t := range tenants {
accountID, projectID, err := auth.ParseToken(t)
if err != nil {
return nil, fmt.Errorf("cannot parse tenant token %q: %w", t, err)
}
tt[i].AccountID = accountID
tt[i].ProjectID = projectID
}
tenantsCacheV.put(tr, tt)
qt.Printf("put %d tenants into cache", len(tenants))
return tt, nil
}
var tenantsCacheV = func() *tenantsCache {
tc := newTenantsCache(*tenantsCacheDuration)
return tc
}()
type tenantsCacheItem struct {
tenants []storage.TenantToken
tr storage.TimeRange
expires time.Time
}
type tenantsCache struct {
// items is used for intersection matches lookup
items []*tenantsCacheItem
itemExpiration time.Duration
requests atomic.Uint64
misses atomic.Uint64
mu sync.Mutex
}
func newTenantsCache(expiration time.Duration) *tenantsCache {
tc := &tenantsCache{
items: make([]*tenantsCacheItem, 0),
itemExpiration: expiration,
}
metrics.GetOrCreateGauge(`vm_cache_requests_total{type="multitenancy/tenants"}`, func() float64 {
return float64(tc.Requests())
})
metrics.GetOrCreateGauge(`vm_cache_misses_total{type="multitenancy/tenants"}`, func() float64 {
return float64(tc.Misses())
})
metrics.GetOrCreateGauge(`vm_cache_entries{type="multitenancy/tenants"}`, func() float64 {
return float64(tc.Len())
})
return tc
}
func (tc *tenantsCache) cleanupLocked() {
expires := time.Now().Add(tc.itemExpiration)
for i := len(tc.items) - 1; i >= 0; i-- {
if tc.items[i].expires.Before(expires) {
tc.items = append(tc.items[:i], tc.items[i+1:]...)
}
}
}
func (tc *tenantsCache) put(tr storage.TimeRange, tenants []storage.TenantToken) {
tc.mu.Lock()
defer tc.mu.Unlock()
alignTrToDay(&tr)
exp := time.Now().Add(timeutil.AddJitterToDuration(tc.itemExpiration))
ci := &tenantsCacheItem{
tenants: tenants,
tr: tr,
expires: exp,
}
tc.items = append(tc.items, ci)
}
func (tc *tenantsCache) Requests() uint64 {
return tc.requests.Load()
}
func (tc *tenantsCache) Misses() uint64 {
return tc.misses.Load()
}
func (tc *tenantsCache) Len() uint64 {
tc.mu.Lock()
n := len(tc.items)
tc.mu.Unlock()
return uint64(n)
}
func (tc *tenantsCache) get(tr storage.TimeRange) []storage.TenantToken {
tc.requests.Add(1)
alignTrToDay(&tr)
return tc.getInternal(tr)
}
func (tc *tenantsCache) getInternal(tr storage.TimeRange) []storage.TenantToken {
tc.mu.Lock()
defer tc.mu.Unlock()
if len(tc.items) == 0 {
return nil
}
result := make(map[storage.TenantToken]struct{})
cleanupNeeded := false
for idx := range tc.items {
ci := tc.items[idx]
if ci.expires.Before(time.Now()) {
cleanupNeeded = true
}
if hasIntersection(tr, ci.tr) {
for _, t := range ci.tenants {
result[t] = struct{}{}
}
}
}
if cleanupNeeded {
tc.cleanupLocked()
}
tenants := make([]storage.TenantToken, 0, len(result))
for t := range result {
tenants = append(tenants, t)
}
return tenants
}
// alignTrToDay aligns the given time range to the day boundaries
// tr.minTimestamp will be set to the start of the day
// tr.maxTimestamp will be set to the end of the day
func alignTrToDay(tr *storage.TimeRange) {
tr.MinTimestamp = timeutil.StartOfDay(tr.MinTimestamp)
tr.MaxTimestamp = timeutil.EndOfDay(tr.MaxTimestamp)
}
// hasIntersection checks if there is any intersection of the given time ranges
func hasIntersection(a, b storage.TimeRange) bool {
return a.MinTimestamp <= b.MaxTimestamp && a.MaxTimestamp >= b.MinTimestamp
}