From 2c96f3512d342bd4881e795effd82f3af12ed267 Mon Sep 17 00:00:00 2001 From: dmitryk-dk Date: Thu, 7 Nov 2024 21:51:43 +0100 Subject: [PATCH] vllogs: implement tenants endpoint for victorialogs --- app/vlselect/logsql/logsql.go | 39 ++++++++ app/vlselect/logsql/tenants_response.qtpl | 15 ++++ app/vlselect/logsql/tenants_response.qtpl.go | 67 ++++++++++++++ app/vlselect/main.go | 8 +- app/vlstorage/main.go | 5 ++ lib/logstorage/indexdb.go | 54 ++++++++++++ lib/logstorage/indexdb_test.go | 6 +- lib/logstorage/storage_search.go | 93 ++++++++++++++++++++ 8 files changed, 285 insertions(+), 2 deletions(-) create mode 100644 app/vlselect/logsql/tenants_response.qtpl create mode 100644 app/vlselect/logsql/tenants_response.qtpl.go diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index d267db2f1..73a960f9f 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -839,6 +839,45 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } } +// ProcessAdminTenantsRequest processes /select/logsql/admin_tenants request. +func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + + start, okStart, err := getTimeNsec(r, "start") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + end, okEnd, err := getTimeNsec(r, "end") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if !okStart { + start = math.MinInt64 + } + if !okEnd { + end = math.MaxInt64 + } + + bw := getBufferedWriter(w) + defer func() { + bw.FlushIgnoreErrors() + putBufferedWriter(bw) + }() + w.Header().Set("Content-Type", "application/json") + + tenants, err := vlstorage.GetTenantIDs(ctx, start, end) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err) + return + } + + bb := blockResultPool.Get() + WriteTenantsResponse(bb, tenants) + bw.WriteIgnoreErrors(bb.B) + blockResultPool.Put(bb) +} + var blockResultPool bytesutil.ByteBufferPool type row struct { diff --git a/app/vlselect/logsql/tenants_response.qtpl b/app/vlselect/logsql/tenants_response.qtpl new file mode 100644 index 000000000..7e82e0ba7 --- /dev/null +++ b/app/vlselect/logsql/tenants_response.qtpl @@ -0,0 +1,15 @@ +{% stripspace %} + +TenantsResponse generates response for /admin/tenants . +{% func TenantsResponse(tenants []string) %} +{ + "status":"success", + "data":[ + {% for i, tenant := range tenants %} + {%q= tenant %} + {% if i+1 < len(tenants) %},{% endif %} + {% endfor %} + ] +} +{% endfunc %} +{% endstripspace %} diff --git a/app/vlselect/logsql/tenants_response.qtpl.go b/app/vlselect/logsql/tenants_response.qtpl.go new file mode 100644 index 000000000..12222f821 --- /dev/null +++ b/app/vlselect/logsql/tenants_response.qtpl.go @@ -0,0 +1,67 @@ +// Code generated by qtc from "tenants_response.qtpl". DO NOT EDIT. +// See https://github.com/valyala/quicktemplate for details. + +// TenantsResponse generates response for /admin/tenants . + +//line app/vlselect/logsql/tenants_response.qtpl:4 +package logsql + +//line app/vlselect/logsql/tenants_response.qtpl:4 +import ( + qtio422016 "io" + + qt422016 "github.com/valyala/quicktemplate" +) + +//line app/vlselect/logsql/tenants_response.qtpl:4 +var ( + _ = qtio422016.Copy + _ = qt422016.AcquireByteBuffer +) + +//line app/vlselect/logsql/tenants_response.qtpl:4 +func StreamTenantsResponse(qw422016 *qt422016.Writer, tenants []string) { +//line app/vlselect/logsql/tenants_response.qtpl:4 + qw422016.N().S(`{"status":"success","data":[`) +//line app/vlselect/logsql/tenants_response.qtpl:8 + for i, tenant := range tenants { +//line app/vlselect/logsql/tenants_response.qtpl:9 + qw422016.N().Q(tenant) +//line app/vlselect/logsql/tenants_response.qtpl:10 + if i+1 < len(tenants) { +//line app/vlselect/logsql/tenants_response.qtpl:10 + qw422016.N().S(`,`) +//line app/vlselect/logsql/tenants_response.qtpl:10 + } +//line app/vlselect/logsql/tenants_response.qtpl:11 + } +//line app/vlselect/logsql/tenants_response.qtpl:11 + qw422016.N().S(`]}`) +//line app/vlselect/logsql/tenants_response.qtpl:14 +} + +//line app/vlselect/logsql/tenants_response.qtpl:14 +func WriteTenantsResponse(qq422016 qtio422016.Writer, tenants []string) { +//line app/vlselect/logsql/tenants_response.qtpl:14 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 + StreamTenantsResponse(qw422016, tenants) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qt422016.ReleaseWriter(qw422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 +} + +//line app/vlselect/logsql/tenants_response.qtpl:14 +func TenantsResponse(tenants []string) string { +//line app/vlselect/logsql/tenants_response.qtpl:14 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vlselect/logsql/tenants_response.qtpl:14 + WriteTenantsResponse(qb422016, tenants) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qs422016 := string(qb422016.B) +//line app/vlselect/logsql/tenants_response.qtpl:14 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vlselect/logsql/tenants_response.qtpl:14 + return qs422016 +//line app/vlselect/logsql/tenants_response.qtpl:14 +} diff --git a/app/vlselect/main.go b/app/vlselect/main.go index b8d8c7e60..dbb35ae53 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -9,12 +9,13 @@ import ( "strings" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlselect/logsql" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/metrics" ) var ( @@ -217,6 +218,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re logsqlStreamsRequests.Inc() logsql.ProcessStreamsRequest(ctx, w, r) return true + case "/select/admin/tenants": + logsqlAdminTenantsRequests.Inc() + logsql.ProcessAdminTenantsRequest(ctx, w, r) + return true default: return false } @@ -247,4 +252,5 @@ var ( logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`) logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`) + logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/admin/tenants"}`) ) diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 4b8ead146..6850eaab8 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -176,6 +176,11 @@ func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logst return strg.GetStreamIDs(ctx, tenantIDs, q, limit) } +// GetTenantIDs returns tenantIDs in the storage. +func GetTenantIDs(ctx context.Context, start, end int64) ([]string, error) { + return strg.GetTenantIDs(ctx, start, end) +} + func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) { var ss logstorage.StorageStats strg.UpdateStats(&ss) diff --git a/lib/logstorage/indexdb.go b/lib/logstorage/indexdb.go index 89efb6015..dc127c568 100644 --- a/lib/logstorage/indexdb.go +++ b/lib/logstorage/indexdb.go @@ -437,6 +437,53 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin return ids } +func (is *indexSearch) getTenantIDs() []string { + tenants := make(map[string]struct{}) + ts := &is.ts + kb := &is.kb + + tID := TenantID{0, 0} + + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID) + ts.Seek(kb.B) + + for ts.NextItem() { + _, prefix, err := unmarshalCommonPrefix(&tID, ts.Item) + if err != nil { + logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err) + } + if prefix != nsPrefixStreamID { + // Reached the end of enteris with the needed prefix. + break + } + tenant := fmt.Sprintf("%d:%d", tID.AccountID, tID.ProjectID) + tenants[tenant] = struct{}{} + // Seek for the next (accountID, projectID) + tID.ProjectID++ + if tID.ProjectID == 0 { + tID.AccountID++ + if tID.AccountID == 0 { + // Reached the end (accountID, projectID) space + break + } + } + + kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID) + ts.Seek(kb.B) + } + + if err := ts.Error(); err != nil { + logger.Panicf("FATAL: error when performing search: %s", err) + } + + tenantIDs := make([]string, 0) + for tenantID := range tenants { + tenantIDs = append(tenantIDs, tenantID) + } + + return tenantIDs +} + func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical []byte) { st := GetStreamTags() mustUnmarshalStreamTags(st, streamTagsCanonical) @@ -542,6 +589,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter bbPool.Put(bb) } +func (idb *indexdb) searchTenants() []string { + is := idb.getIndexSearch() + defer idb.putIndexSearch(is) + + return is.getTenantIDs() +} + type batchItems struct { buf []byte diff --git a/lib/logstorage/indexdb_test.go b/lib/logstorage/indexdb_test.go index 9625a4378..90b072a8c 100644 --- a/lib/logstorage/indexdb_test.go +++ b/lib/logstorage/indexdb_test.go @@ -112,7 +112,7 @@ func TestStorageSearchStreamIDs(t *testing.T) { // non-existing-tag-re f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil) - //non-existing-non-empty-tag-re + // non-existing-non-empty-tag-re f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil) // match-job-instance @@ -252,3 +252,7 @@ func TestStorageSearchStreamIDs(t *testing.T) { closeTestStorage(s) } + +func TestGetTenantsIds(t *testing.T) { + +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 4144475ee..1e91b0692 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -394,6 +394,36 @@ func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Que return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit) } +// GetTenantIDs returns tenantIDs for the given q. +func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]string, error) { + return s.getTenantIDs(ctx, start, end) +} + +func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]string, error) { + workersCount := cgroup.AvailableCPUs() + stopCh := ctx.Done() + + tenantIDs := make([][]string, workersCount) + processPartitions := func(pt *partition, workerID uint) { + tenants := pt.idb.searchTenants() + tenantIDs[workerID] = append(tenantIDs[workerID], tenants...) + } + s.searchPartitions(workersCount, stopCh, processPartitions, start, end) + + m := make(map[string]struct{}) + for _, tids := range tenantIDs { + for _, tid := range tids { + m[tid] = struct{}{} + } + } + tenants := make([]string, 0, len(m)) + for tid := range m { + tenants = append(tenants, tid) + } + sort.Strings(tenants) + return tenants, nil +} + func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) { var results []ValueWithHits var resultsLock sync.Mutex @@ -726,6 +756,69 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch } } +func (s *Storage) searchPartitions(workersCount int, stopCh <-chan struct{}, process func(*partition, uint), start, end int64) { + // Spin up workers + var wgWorkers sync.WaitGroup + workCh := make(chan *partition, workersCount) + wgWorkers.Add(workersCount) + for i := 0; i < workersCount; i++ { + go func(workerID uint) { + for pt := range workCh { + if needStop(stopCh) { + // The search has been canceled. Just skip all the scheduled work in order to save CPU time. + continue + } + process(pt, workerID) + } + wgWorkers.Done() + }(uint(i)) + } + + // Select partitions according to the selected time range + s.partitionsLock.Lock() + ptws := s.partitions + minDay := start / nsecsPerDay + n := sort.Search(len(ptws), func(i int) bool { + return ptws[i].day >= minDay + }) + ptws = ptws[n:] + maxDay := end / nsecsPerDay + n = sort.Search(len(ptws), func(i int) bool { + return ptws[i].day > maxDay + }) + ptws = ptws[:n] + + // Copy the selected partitions, so they don't interfere with s.partitions. + ptws = append([]*partitionWrapper{}, ptws...) + + for _, ptw := range ptws { + ptw.incRef() + } + s.partitionsLock.Unlock() + + // Schedule concurrent search across matching partitions. + var wgSearchers sync.WaitGroup + for _, ptw := range ptws { + partitionSearchConcurrencyLimitCh <- struct{}{} + wgSearchers.Add(1) + go func(pt *partition) { + workCh <- pt + wgSearchers.Done() + <-partitionSearchConcurrencyLimitCh + }(ptw.pt) + } + wgSearchers.Wait() + + // Wait until workers finish their work + close(workCh) + wgWorkers.Wait() + + // Decrement references to partitions + for _, ptw := range ptws { + ptw.decRef() + } +} + // partitionSearchConcurrencyLimitCh limits the number of concurrent searches in partition. // // This is needed for limiting memory usage under high load.