vllogs: implement tenants endpoint for victorialogs

This commit is contained in:
dmitryk-dk 2024-11-07 21:51:43 +01:00
parent f62502a943
commit 2c96f3512d
No known key found for this signature in database
GPG key ID: 789E20A826606D2A
8 changed files with 285 additions and 2 deletions

View file

@ -839,6 +839,45 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
}
// ProcessAdminTenantsRequest processes /select/logsql/admin_tenants request.
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}
bw := getBufferedWriter(w)
defer func() {
bw.FlushIgnoreErrors()
putBufferedWriter(bw)
}()
w.Header().Set("Content-Type", "application/json")
tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}
bb := blockResultPool.Get()
WriteTenantsResponse(bb, tenants)
bw.WriteIgnoreErrors(bb.B)
blockResultPool.Put(bb)
}
var blockResultPool bytesutil.ByteBufferPool
type row struct {

View file

@ -0,0 +1,15 @@
{% stripspace %}
TenantsResponse generates response for /admin/tenants .
{% func TenantsResponse(tenants []string) %}
{
"status":"success",
"data":[
{% for i, tenant := range tenants %}
{%q= tenant %}
{% if i+1 < len(tenants) %},{% endif %}
{% endfor %}
]
}
{% endfunc %}
{% endstripspace %}

View file

@ -0,0 +1,67 @@
// Code generated by qtc from "tenants_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// TenantsResponse generates response for /admin/tenants .
//line app/vlselect/logsql/tenants_response.qtpl:4
package logsql
//line app/vlselect/logsql/tenants_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vlselect/logsql/tenants_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vlselect/logsql/tenants_response.qtpl:4
func StreamTenantsResponse(qw422016 *qt422016.Writer, tenants []string) {
//line app/vlselect/logsql/tenants_response.qtpl:4
qw422016.N().S(`{"status":"success","data":[`)
//line app/vlselect/logsql/tenants_response.qtpl:8
for i, tenant := range tenants {
//line app/vlselect/logsql/tenants_response.qtpl:9
qw422016.N().Q(tenant)
//line app/vlselect/logsql/tenants_response.qtpl:10
if i+1 < len(tenants) {
//line app/vlselect/logsql/tenants_response.qtpl:10
qw422016.N().S(`,`)
//line app/vlselect/logsql/tenants_response.qtpl:10
}
//line app/vlselect/logsql/tenants_response.qtpl:11
}
//line app/vlselect/logsql/tenants_response.qtpl:11
qw422016.N().S(`]}`)
//line app/vlselect/logsql/tenants_response.qtpl:14
}
//line app/vlselect/logsql/tenants_response.qtpl:14
func WriteTenantsResponse(qq422016 qtio422016.Writer, tenants []string) {
//line app/vlselect/logsql/tenants_response.qtpl:14
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vlselect/logsql/tenants_response.qtpl:14
StreamTenantsResponse(qw422016, tenants)
//line app/vlselect/logsql/tenants_response.qtpl:14
qt422016.ReleaseWriter(qw422016)
//line app/vlselect/logsql/tenants_response.qtpl:14
}
//line app/vlselect/logsql/tenants_response.qtpl:14
func TenantsResponse(tenants []string) string {
//line app/vlselect/logsql/tenants_response.qtpl:14
qb422016 := qt422016.AcquireByteBuffer()
//line app/vlselect/logsql/tenants_response.qtpl:14
WriteTenantsResponse(qb422016, tenants)
//line app/vlselect/logsql/tenants_response.qtpl:14
qs422016 := string(qb422016.B)
//line app/vlselect/logsql/tenants_response.qtpl:14
qt422016.ReleaseByteBuffer(qb422016)
//line app/vlselect/logsql/tenants_response.qtpl:14
return qs422016
//line app/vlselect/logsql/tenants_response.qtpl:14
}

View file

@ -9,12 +9,13 @@ import (
"strings"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/logsql"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
)
var (
@ -217,6 +218,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsqlStreamsRequests.Inc()
logsql.ProcessStreamsRequest(ctx, w, r)
return true
case "/select/admin/tenants":
logsqlAdminTenantsRequests.Inc()
logsql.ProcessAdminTenantsRequest(ctx, w, r)
return true
default:
return false
}
@ -247,4 +252,5 @@ var (
logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`)
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/admin/tenants"}`)
)

View file

@ -176,6 +176,11 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst
return strg.GetStreamIDs(ctx, tenantIDs, q, limit)
}
// GetTenantIDs returns tenantIDs in the storage.
func GetTenantIDs(ctx context.Context, start, end int64) ([]string, error) {
return strg.GetTenantIDs(ctx, start, end)
}
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)

View file

@ -437,6 +437,53 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
return ids
}
func (is *indexSearch) getTenantIDs() []string {
tenants := make(map[string]struct{})
ts := &is.ts
kb := &is.kb
tID := TenantID{0, 0}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)
for ts.NextItem() {
_, prefix, err := unmarshalCommonPrefix(&tID, ts.Item)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
}
if prefix != nsPrefixStreamID {
// Reached the end of enteris with the needed prefix.
break
}
tenant := fmt.Sprintf("%d:%d", tID.AccountID, tID.ProjectID)
tenants[tenant] = struct{}{}
// Seek for the next (accountID, projectID)
tID.ProjectID++
if tID.ProjectID == 0 {
tID.AccountID++
if tID.AccountID == 0 {
// Reached the end (accountID, projectID) space
break
}
}
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)
}
if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when performing search: %s", err)
}
tenantIDs := make([]string, 0)
for tenantID := range tenants {
tenantIDs = append(tenantIDs, tenantID)
}
return tenantIDs
}
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
@ -542,6 +589,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
bbPool.Put(bb)
}
func (idb *indexdb) searchTenants() []string {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)
return is.getTenantIDs()
}
type batchItems struct {
buf []byte

View file

@ -112,7 +112,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
// non-existing-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil)
//non-existing-non-empty-tag-re
// non-existing-non-empty-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil)
// match-job-instance
@ -252,3 +252,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
closeTestStorage(s)
}
func TestGetTenantsIds(t *testing.T) {
}

View file

@ -394,6 +394,36 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Que
return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit)
}
// GetTenantIDs returns tenantIDs for the given q.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]string, error) {
return s.getTenantIDs(ctx, start, end)
}
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]string, error) {
workersCount := cgroup.AvailableCPUs()
stopCh := ctx.Done()
tenantIDs := make([][]string, workersCount)
processPartitions := func(pt *partition, workerID uint) {
tenants := pt.idb.searchTenants()
tenantIDs[workerID] = append(tenantIDs[workerID], tenants...)
}
s.searchPartitions(workersCount, stopCh, processPartitions, start, end)
m := make(map[string]struct{})
for _, tids := range tenantIDs {
for _, tid := range tids {
m[tid] = struct{}{}
}
}
tenants := make([]string, 0, len(m))
for tid := range m {
tenants = append(tenants, tid)
}
sort.Strings(tenants)
return tenants, nil
}
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex
@ -726,6 +756,69 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
}
}
func (s *Storage) searchPartitions(workersCount int, stopCh <-chan struct{}, process func(*partition, uint), start, end int64) {
// Spin up workers
var wgWorkers sync.WaitGroup
workCh := make(chan *partition, workersCount)
wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
for pt := range workCh {
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
continue
}
process(pt, workerID)
}
wgWorkers.Done()
}(uint(i))
}
// Select partitions according to the selected time range
s.partitionsLock.Lock()
ptws := s.partitions
minDay := start / nsecsPerDay
n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay
})
ptws = ptws[n:]
maxDay := end / nsecsPerDay
n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay
})
ptws = ptws[:n]
// Copy the selected partitions, so they don't interfere with s.partitions.
ptws = append([]*partitionWrapper{}, ptws...)
for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()
// Schedule concurrent search across matching partitions.
var wgSearchers sync.WaitGroup
for _, ptw := range ptws {
partitionSearchConcurrencyLimitCh <- struct{}{}
wgSearchers.Add(1)
go func(pt *partition) {
workCh <- pt
wgSearchers.Done()
<-partitionSearchConcurrencyLimitCh
}(ptw.pt)
}
wgSearchers.Wait()
// Wait until workers finish their work
close(workCh)
wgWorkers.Wait()
// Decrement references to partitions
for _, ptw := range ptws {
ptw.decRef()
}
}
// partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition.
//
// This is needed for limiting memory usage under high load.