diff --git a/README.md b/README.md index 2f5555ea1..bbc59b61b 100644 --- a/README.md +++ b/README.md @@ -3184,7 +3184,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li -search.maxPointsSubqueryPerTimeseries int The maximum number of points per series, which can be generated by subquery. See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3 (default 100000) -search.maxQueryDuration duration - The maximum duration for query execution (default 30s) + The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s) -search.maxQueryLen size The maximum search query length in bytes Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index b8b8ddb34..15ddc8dfe 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/VictoriaMetrics/metrics" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" @@ -250,6 +252,38 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter, WriteValuesWithHitsJSON(w, values) } +// ProcessStreamIDsRequest processes /select/logsql/stream_ids request. +// +// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream_ids +func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + + // Parse limit query arg + limit, err := httputils.GetInt(r, "limit") + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if limit < 0 { + limit = 0 + } + + // Obtain streamIDs for the given query + q.Optimize() + streamIDs, err := vlstorage.GetStreamIDs(ctx, tenantIDs, q, uint64(limit)) + if err != nil { + httpserver.Errorf(w, r, "cannot obtain stream_ids: %s", err) + } + + // Write results + w.Header().Set("Content-Type", "application/json") + WriteValuesWithHitsJSON(w, streamIDs) +} + // ProcessStreamsRequest processes /select/logsql/streams request. // // See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams @@ -282,6 +316,189 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R WriteValuesWithHitsJSON(w, streams) } +// ProcessLiveTailRequest processes live tailing request to /select/logsq/tail +func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { + liveTailRequests.Inc() + defer liveTailRequests.Dec() + + q, tenantIDs, err := parseCommonArgs(r) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + if !q.CanLiveTail() { + httpserver.Errorf(w, r, "the query [%s] cannot be used in live tailing; see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing for details", q) + } + q.Optimize() + + refreshIntervalMsecs, err := httputils.GetDuration(r, "refresh_interval", 1000) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + refreshInterval := time.Millisecond * time.Duration(refreshIntervalMsecs) + + ctxWithCancel, cancel := context.WithCancel(ctx) + tp := newTailProcessor(cancel) + + ticker := time.NewTicker(refreshInterval) + defer ticker.Stop() + + end := time.Now().UnixNano() + doneCh := ctxWithCancel.Done() + for { + start := end - tailOffsetNsecs + end = time.Now().UnixNano() + + qCopy := q.Clone() + qCopy.AddTimeFilter(start, end) + if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, qCopy, tp.writeBlock); err != nil { + httpserver.Errorf(w, r, "cannot execute tail query [%s]: %s", q, err) + return + } + resultRows, err := tp.getTailRows() + if err != nil { + httpserver.Errorf(w, r, "cannot get tail results for query [%q]: %s", q, err) + return + } + WriteJSONRows(w, resultRows) + + select { + case <-doneCh: + return + case <-ticker.C: + } + } +} + +var liveTailRequests = metrics.NewCounter(`vl_live_tailing_requests`) + +const tailOffsetNsecs = 5e9 + +type logRow struct { + timestamp int64 + fields []logstorage.Field +} + +func sortLogRows(rows []logRow) { + sort.Slice(rows, func(i, j int) bool { + return rows[i].timestamp < rows[j].timestamp + }) +} + +type tailProcessor struct { + cancel func() + + mu sync.Mutex + + perStreamRows map[string][]logRow + lastTimestamps map[string]int64 + + err error +} + +func newTailProcessor(cancel func()) *tailProcessor { + return &tailProcessor{ + cancel: cancel, + + perStreamRows: make(map[string][]logRow), + lastTimestamps: make(map[string]int64), + } +} + +func (tp *tailProcessor) writeBlock(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { + if len(timestamps) == 0 { + return + } + + tp.mu.Lock() + defer tp.mu.Unlock() + + if tp.err != nil { + return + } + + // Make sure columns contain _time and _stream_id fields. + // These fields are needed for proper tail work. + hasTime := false + hasStreamID := false + for _, c := range columns { + if c.Name == "_time" { + hasTime = true + } + if c.Name == "_stream_id" { + hasStreamID = true + } + } + if !hasTime { + tp.err = fmt.Errorf("missing _time field") + tp.cancel() + return + } + if !hasStreamID { + tp.err = fmt.Errorf("missing _stream_id field") + tp.cancel() + return + } + + // Copy block rows to tp.perStreamRows + for i, timestamp := range timestamps { + streamID := "" + fields := make([]logstorage.Field, len(columns)) + for j, c := range columns { + name := strings.Clone(c.Name) + value := strings.Clone(c.Values[i]) + + fields[j] = logstorage.Field{ + Name: name, + Value: value, + } + + if name == "_stream_id" { + streamID = value + } + } + tp.perStreamRows[streamID] = append(tp.perStreamRows[streamID], logRow{ + timestamp: timestamp, + fields: fields, + }) + } +} + +func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) { + if tp.err != nil { + return nil, tp.err + } + + var resultRows []logRow + for streamID, rows := range tp.perStreamRows { + sortLogRows(rows) + + lastTimestamp, ok := tp.lastTimestamps[streamID] + if ok { + // Skip already written rows + for i := range rows { + if rows[i].timestamp > lastTimestamp { + rows = rows[i:] + break + } + } + } + resultRows = append(resultRows, rows...) + tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp + } + clear(tp.perStreamRows) + + sortLogRows(resultRows) + + tailRows := make([][]logstorage.Field, len(resultRows)) + for i, row := range resultRows { + tailRows[i] = row.fields + } + + return tailRows, nil +} + // ProcessQueryRequest handles /select/logsql/query request. // // See https://docs.victoriametrics.com/victorialogs/querying/#http-api @@ -344,6 +561,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil { httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err) + return } } diff --git a/app/vlselect/main.go b/app/vlselect/main.go index e43ff42be..c91480682 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -1,6 +1,7 @@ package vlselect import ( + "context" "embed" "flag" "fmt" @@ -13,7 +14,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" ) @@ -23,7 +23,7 @@ var ( "See also -search.maxQueueDuration") maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the search request waits for execution when -search.maxConcurrentRequests "+ "limit is reached; see also -search.maxQueryDuration") - maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution") + maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg") ) func getDefaultMaxConcurrentRequests() int { @@ -98,47 +98,83 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } - // Limit the number of concurrent queries, which can consume big amounts of CPU. + // Limit the number of concurrent queries, which can consume big amounts of CPU time. startTime := time.Now() ctx := r.Context() - stopCh := ctx.Done() + d := getMaxQueryDuration(r) + ctxWithTimeout, cancel := context.WithTimeout(ctx, d) + defer cancel() + + stopCh := ctxWithTimeout.Done() select { case concurrencyLimitCh <- struct{}{}: defer func() { <-concurrencyLimitCh }() default: // Sleep for a while until giving up. This should resolve short bursts in requests. concurrencyLimitReached.Inc() - d := getMaxQueryDuration(r) - if d > *maxQueueDuration { - d = *maxQueueDuration - } - t := timerpool.Get(d) select { case concurrencyLimitCh <- struct{}{}: - timerpool.Put(t) defer func() { <-concurrencyLimitCh }() case <-stopCh: - timerpool.Put(t) - remoteAddr := httpserver.GetQuotedRemoteAddr(r) - requestURI := httpserver.GetRequestURI(r) - logger.Infof("client has cancelled the request after %.3f seconds: remoteAddr=%s, requestURI: %q", - time.Since(startTime).Seconds(), remoteAddr, requestURI) - return true - case <-t.C: - timerpool.Put(t) - concurrencyLimitTimeout.Inc() - err := &httpserver.ErrorWithStatusCode{ - Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+ - "are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+ - "to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests", - d.Seconds(), *maxConcurrentRequests, maxQueueDuration), - StatusCode: http.StatusServiceUnavailable, + switch ctxWithTimeout.Err() { + case context.Canceled: + remoteAddr := httpserver.GetQuotedRemoteAddr(r) + requestURI := httpserver.GetRequestURI(r) + logger.Infof("client has canceled the pending request after %.3f seconds: remoteAddr=%s, requestURI: %q", + time.Since(startTime).Seconds(), remoteAddr, requestURI) + case context.DeadlineExceeded: + concurrencyLimitTimeout.Inc() + err := &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+ + "are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+ + "to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration=%s; to increase -search.maxConcurrentRequests; "+ + "to pass bigger value to 'timeout' query arg", + d.Seconds(), *maxConcurrentRequests, maxQueueDuration, maxQueryDuration), + StatusCode: http.StatusServiceUnavailable, + } + httpserver.Errorf(w, r, "%s", err) } - httpserver.Errorf(w, r, "%s", err) return true } } + if path == "/select/logsql/tail" { + logsqlTailRequests.Inc() + // Process live tailing request without timeout (e.g. use ctx instead of ctxWithTimeout), + // since it is OK to run live tailing requests for very long time. + logsql.ProcessLiveTailRequest(ctx, w, r) + return true + } + + ok := processSelectRequest(ctxWithTimeout, w, r, path) + if !ok { + return false + } + + err := ctxWithTimeout.Err() + switch err { + case nil: + // nothing to do + case context.Canceled: + remoteAddr := httpserver.GetQuotedRemoteAddr(r) + requestURI := httpserver.GetRequestURI(r) + logger.Infof("client has canceled the request after %.3f seconds: remoteAddr=%s, requestURI: %q", + time.Since(startTime).Seconds(), remoteAddr, requestURI) + case context.DeadlineExceeded: + err = &httpserver.ErrorWithStatusCode{ + Err: fmt.Errorf("the request couldn't be executed in %.3f seconds; possible solutions: "+ + "to increase -search.maxQueryDuration=%s; to pass bigger value to 'timeout' query arg", d.Seconds(), maxQueryDuration), + StatusCode: http.StatusServiceUnavailable, + } + httpserver.Errorf(w, r, "%s", err) + default: + httpserver.Errorf(w, r, "unexpected error: %s", err) + } + + return true +} + +func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) bool { httpserver.EnableCORS(w, r) switch path { case "/select/logsql/field_names": @@ -165,6 +201,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { logsqlStreamFieldValuesRequests.Inc() logsql.ProcessStreamFieldValuesRequest(ctx, w, r) return true + case "/select/logsql/stream_ids": + logsqlStreamIDsRequests.Inc() + logsql.ProcessStreamIDsRequest(ctx, w, r) + return true case "/select/logsql/streams": logsqlStreamsRequests.Inc() logsql.ProcessStreamsRequest(ctx, w, r) @@ -194,5 +234,7 @@ var ( logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`) logsqlStreamFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_names"}`) logsqlStreamFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_values"}`) + logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`) logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`) + logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`) ) diff --git a/app/vlselect/vmui/asset-manifest.json b/app/vlselect/vmui/asset-manifest.json index f71e53467..09e4caae6 100644 --- a/app/vlselect/vmui/asset-manifest.json +++ b/app/vlselect/vmui/asset-manifest.json @@ -1,13 +1,13 @@ { "files": { "main.css": "./static/css/main.1041c3d4.css", - "main.js": "./static/js/main.e54f9531.js", + "main.js": "./static/js/main.8988988c.js", "static/js/685.bebe1265.chunk.js": "./static/js/685.bebe1265.chunk.js", - "static/media/MetricsQL.md": "./static/media/MetricsQL.cb83d071da309a358bc0.md", + "static/media/MetricsQL.md": "./static/media/MetricsQL.aaabf95f2c9bf356bde4.md", "index.html": "./index.html" }, "entrypoints": [ "static/css/main.1041c3d4.css", - "static/js/main.e54f9531.js" + "static/js/main.8988988c.js" ] } \ No newline at end of file diff --git a/app/vlselect/vmui/index.html b/app/vlselect/vmui/index.html index 4d95d1331..c89727c0c 100644 --- a/app/vlselect/vmui/index.html +++ b/app/vlselect/vmui/index.html @@ -1 +1 @@ -
'+(n?e:eo(e,!0))+"
\n":""+(n?e:eo(e,!0))+"
\n"}blockquote(e){return"\n".concat(e,"\n")}html(e,t){return e}heading(e,t,n){return"
".concat(e,"
\n")}table(e,t){return t&&(t="".concat(t,"")),"".concat(e,"
")}br(){return"An error occurred:
"+eo(n.message+"",!0)+"";return t?Promise.resolve(e):e}if(t)return Promise.reject(n);throw n}}const na=new class{constructor(){Fr(this,Xo),Br(this,"defaults",{async:!1,breaks:!1,extensions:null,gfm:!0,hooks:null,pedantic:!1,renderer:null,silent:!1,tokenizer:null,walkTokens:null}),Br(this,"options",this.setOptions),Br(this,"parse",Hr(Xo,this,ea).call(this,Zo.lex,Qo.parse)),Br(this,"parseInline",Hr(Xo,this,ea).call(this,Zo.lexInline,Qo.parseInline)),Br(this,"Parser",Qo),Br(this,"Renderer",Ko),Br(this,"TextRenderer",Go),Br(this,"Lexer",Zo),Br(this,"Tokenizer",uo),Br(this,"Hooks",Jo),this.use(...arguments)}walkTokens(e,t){let n=[];for(const o of e)switch(n=n.concat(t.call(this,o)),o.type){case"table":{const e=o;for(const r of e.header)n=n.concat(this.walkTokens(r.tokens,t));for(const r of e.rows)for(const e of r)n=n.concat(this.walkTokens(e.tokens,t));break}case"list":{const e=o;n=n.concat(this.walkTokens(e.items,t));break}default:{var r;const e=o;null!==(r=this.defaults.extensions)&&void 0!==r&&null!==(r=r.childTokens)&&void 0!==r&&r[e.type]?this.defaults.extensions.childTokens[e.type].forEach((r=>{const o=e[r].flat(1/0);n=n.concat(this.walkTokens(o,t))})):e.tokens&&(n=n.concat(this.walkTokens(e.tokens,t)))}}return n}use(){const e=this.defaults.extensions||{renderers:{},childTokens:{}};for(var t=arguments.length,n=new Array(t),r=0;r
'+(n?e:Xr(e,!0))+"
\n":""+(n?e:Xr(e,!0))+"
\n"}blockquote(e){return"\n".concat(e,"\n")}html(e,t){return e}heading(e,t,n){return"
".concat(e,"
\n")}table(e,t){return t&&(t="".concat(t,"")),"".concat(e,"
")}br(){return"An error occurred:
"+Xr(n.message+"",!0)+"";return t?Promise.resolve(e):e}if(t)return Promise.reject(n);throw n}}const na=new class{constructor(){jr(this,Xo),jr(this,Jo),Ur(this,"defaults",{async:!1,breaks:!1,extensions:null,gfm:!0,hooks:null,pedantic:!1,renderer:null,silent:!1,tokenizer:null,walkTokens:null}),Ur(this,"options",this.setOptions),Ur(this,"parse",Fr(this,Jo,ea).call(this,qo.lex,Go.parse)),Ur(this,"parseInline",Fr(this,Jo,ea).call(this,qo.lexInline,Go.parseInline)),Ur(this,"Parser",Go),Ur(this,"Renderer",Zo),Ur(this,"TextRenderer",Ko),Ur(this,"Lexer",qo),Ur(this,"Tokenizer",co),Ur(this,"Hooks",Qo),this.use(...arguments)}walkTokens(e,t){let n=[];for(const o of e)switch(n=n.concat(t.call(this,o)),o.type){case"table":{const e=o;for(const r of e.header)n=n.concat(this.walkTokens(r.tokens,t));for(const r of e.rows)for(const e of r)n=n.concat(this.walkTokens(e.tokens,t));break}case"list":{const e=o;n=n.concat(this.walkTokens(e.items,t));break}default:{var r;const e=o;null!==(r=this.defaults.extensions)&&void 0!==r&&null!==(r=r.childTokens)&&void 0!==r&&r[e.type]?this.defaults.extensions.childTokens[e.type].forEach((r=>{const o=e[r].flat(1/0);n=n.concat(this.walkTokens(o,t))})):e.tokens&&(n=n.concat(this.walkTokens(e.tokens,t)))}}return n}use(){const e=this.defaults.extensions||{renderers:{},childTokens:{}};for(var t=arguments.length,n=new Array(t),r=0;r