lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-27 14:18:42 +02:00
parent 96bdeb3f10
commit 87f1c8bd6c
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
60 changed files with 738 additions and 110 deletions

View file

@ -3184,7 +3184,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-search.maxPointsSubqueryPerTimeseries int
The maximum number of points per series, which can be generated by subquery. See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3 (default 100000)
-search.maxQueryDuration duration
The maximum duration for query execution (default 30s)
The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s)
-search.maxQueryLen size
The maximum search query length in bytes
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)

View file

@ -10,6 +10,8 @@ import (
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
@ -250,6 +252,38 @@ func ProcessStreamFieldValuesRequest(ctx context.Context, w http.ResponseWriter,
WriteValuesWithHitsJSON(w, values)
}
// ProcessStreamIDsRequest processes /select/logsql/stream_ids request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-stream_ids
func ProcessStreamIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
// Parse limit query arg
limit, err := httputils.GetInt(r, "limit")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if limit < 0 {
limit = 0
}
// Obtain streamIDs for the given query
q.Optimize()
streamIDs, err := vlstorage.GetStreamIDs(ctx, tenantIDs, q, uint64(limit))
if err != nil {
httpserver.Errorf(w, r, "cannot obtain stream_ids: %s", err)
}
// Write results
w.Header().Set("Content-Type", "application/json")
WriteValuesWithHitsJSON(w, streamIDs)
}
// ProcessStreamsRequest processes /select/logsql/streams request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#querying-streams
@ -282,6 +316,189 @@ func ProcessStreamsRequest(ctx context.Context, w http.ResponseWriter, r *http.R
WriteValuesWithHitsJSON(w, streams)
}
// ProcessLiveTailRequest processes live tailing request to /select/logsq/tail
func ProcessLiveTailRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
liveTailRequests.Inc()
defer liveTailRequests.Dec()
q, tenantIDs, err := parseCommonArgs(r)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !q.CanLiveTail() {
httpserver.Errorf(w, r, "the query [%s] cannot be used in live tailing; see https://docs.victoriametrics.com/victorialogs/querying/#live-tailing for details", q)
}
q.Optimize()
refreshIntervalMsecs, err := httputils.GetDuration(r, "refresh_interval", 1000)
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
refreshInterval := time.Millisecond * time.Duration(refreshIntervalMsecs)
ctxWithCancel, cancel := context.WithCancel(ctx)
tp := newTailProcessor(cancel)
ticker := time.NewTicker(refreshInterval)
defer ticker.Stop()
end := time.Now().UnixNano()
doneCh := ctxWithCancel.Done()
for {
start := end - tailOffsetNsecs
end = time.Now().UnixNano()
qCopy := q.Clone()
qCopy.AddTimeFilter(start, end)
if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, qCopy, tp.writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute tail query [%s]: %s", q, err)
return
}
resultRows, err := tp.getTailRows()
if err != nil {
httpserver.Errorf(w, r, "cannot get tail results for query [%q]: %s", q, err)
return
}
WriteJSONRows(w, resultRows)
select {
case <-doneCh:
return
case <-ticker.C:
}
}
}
var liveTailRequests = metrics.NewCounter(`vl_live_tailing_requests`)
const tailOffsetNsecs = 5e9
type logRow struct {
timestamp int64
fields []logstorage.Field
}
func sortLogRows(rows []logRow) {
sort.Slice(rows, func(i, j int) bool {
return rows[i].timestamp < rows[j].timestamp
})
}
type tailProcessor struct {
cancel func()
mu sync.Mutex
perStreamRows map[string][]logRow
lastTimestamps map[string]int64
err error
}
func newTailProcessor(cancel func()) *tailProcessor {
return &tailProcessor{
cancel: cancel,
perStreamRows: make(map[string][]logRow),
lastTimestamps: make(map[string]int64),
}
}
func (tp *tailProcessor) writeBlock(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(timestamps) == 0 {
return
}
tp.mu.Lock()
defer tp.mu.Unlock()
if tp.err != nil {
return
}
// Make sure columns contain _time and _stream_id fields.
// These fields are needed for proper tail work.
hasTime := false
hasStreamID := false
for _, c := range columns {
if c.Name == "_time" {
hasTime = true
}
if c.Name == "_stream_id" {
hasStreamID = true
}
}
if !hasTime {
tp.err = fmt.Errorf("missing _time field")
tp.cancel()
return
}
if !hasStreamID {
tp.err = fmt.Errorf("missing _stream_id field")
tp.cancel()
return
}
// Copy block rows to tp.perStreamRows
for i, timestamp := range timestamps {
streamID := ""
fields := make([]logstorage.Field, len(columns))
for j, c := range columns {
name := strings.Clone(c.Name)
value := strings.Clone(c.Values[i])
fields[j] = logstorage.Field{
Name: name,
Value: value,
}
if name == "_stream_id" {
streamID = value
}
}
tp.perStreamRows[streamID] = append(tp.perStreamRows[streamID], logRow{
timestamp: timestamp,
fields: fields,
})
}
}
func (tp *tailProcessor) getTailRows() ([][]logstorage.Field, error) {
if tp.err != nil {
return nil, tp.err
}
var resultRows []logRow
for streamID, rows := range tp.perStreamRows {
sortLogRows(rows)
lastTimestamp, ok := tp.lastTimestamps[streamID]
if ok {
// Skip already written rows
for i := range rows {
if rows[i].timestamp > lastTimestamp {
rows = rows[i:]
break
}
}
}
resultRows = append(resultRows, rows...)
tp.lastTimestamps[streamID] = rows[len(rows)-1].timestamp
}
clear(tp.perStreamRows)
sortLogRows(resultRows)
tailRows := make([][]logstorage.Field, len(resultRows))
for i, row := range resultRows {
tailRows[i] = row.fields
}
return tailRows, nil
}
// ProcessQueryRequest handles /select/logsql/query request.
//
// See https://docs.victoriametrics.com/victorialogs/querying/#http-api
@ -344,6 +561,7 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
return
}
}

View file

@ -1,6 +1,7 @@
package vlselect
import (
"context"
"embed"
"flag"
"fmt"
@ -13,7 +14,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
)
@ -23,7 +23,7 @@ var (
"See also -search.maxQueueDuration")
maxQueueDuration = flag.Duration("search.maxQueueDuration", 10*time.Second, "The maximum time the search request waits for execution when -search.maxConcurrentRequests "+
"limit is reached; see also -search.maxQueryDuration")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg")
)
func getDefaultMaxConcurrentRequests() int {
@ -98,47 +98,83 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
// Limit the number of concurrent queries, which can consume big amounts of CPU.
// Limit the number of concurrent queries, which can consume big amounts of CPU time.
startTime := time.Now()
ctx := r.Context()
stopCh := ctx.Done()
d := getMaxQueryDuration(r)
ctxWithTimeout, cancel := context.WithTimeout(ctx, d)
defer cancel()
stopCh := ctxWithTimeout.Done()
select {
case concurrencyLimitCh <- struct{}{}:
defer func() { <-concurrencyLimitCh }()
default:
// Sleep for a while until giving up. This should resolve short bursts in requests.
concurrencyLimitReached.Inc()
d := getMaxQueryDuration(r)
if d > *maxQueueDuration {
d = *maxQueueDuration
}
t := timerpool.Get(d)
select {
case concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
defer func() { <-concurrencyLimitCh }()
case <-stopCh:
timerpool.Put(t)
switch ctxWithTimeout.Err() {
case context.Canceled:
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has cancelled the request after %.3f seconds: remoteAddr=%s, requestURI: %q",
logger.Infof("client has canceled the pending request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
return true
case <-t.C:
timerpool.Put(t)
case context.DeadlineExceeded:
concurrencyLimitTimeout.Inc()
err := &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("couldn't start executing the request in %.3f seconds, since -search.maxConcurrentRequests=%d concurrent requests "+
"are executed. Possible solutions: to reduce query load; to add more compute resources to the server; "+
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration; to increase -search.maxConcurrentRequests",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration),
"to increase -search.maxQueueDuration=%s; to increase -search.maxQueryDuration=%s; to increase -search.maxConcurrentRequests; "+
"to pass bigger value to 'timeout' query arg",
d.Seconds(), *maxConcurrentRequests, maxQueueDuration, maxQueryDuration),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
}
return true
}
}
if path == "/select/logsql/tail" {
logsqlTailRequests.Inc()
// Process live tailing request without timeout (e.g. use ctx instead of ctxWithTimeout),
// since it is OK to run live tailing requests for very long time.
logsql.ProcessLiveTailRequest(ctx, w, r)
return true
}
ok := processSelectRequest(ctxWithTimeout, w, r, path)
if !ok {
return false
}
err := ctxWithTimeout.Err()
switch err {
case nil:
// nothing to do
case context.Canceled:
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has canceled the request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
case context.DeadlineExceeded:
err = &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("the request couldn't be executed in %.3f seconds; possible solutions: "+
"to increase -search.maxQueryDuration=%s; to pass bigger value to 'timeout' query arg", d.Seconds(), maxQueryDuration),
StatusCode: http.StatusServiceUnavailable,
}
httpserver.Errorf(w, r, "%s", err)
default:
httpserver.Errorf(w, r, "unexpected error: %s", err)
}
return true
}
func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Request, path string) bool {
httpserver.EnableCORS(w, r)
switch path {
case "/select/logsql/field_names":
@ -165,6 +201,10 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
logsqlStreamFieldValuesRequests.Inc()
logsql.ProcessStreamFieldValuesRequest(ctx, w, r)
return true
case "/select/logsql/stream_ids":
logsqlStreamIDsRequests.Inc()
logsql.ProcessStreamIDsRequest(ctx, w, r)
return true
case "/select/logsql/streams":
logsqlStreamsRequests.Inc()
logsql.ProcessStreamsRequest(ctx, w, r)
@ -194,5 +234,7 @@ var (
logsqlQueryRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/query"}`)
logsqlStreamFieldNamesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_names"}`)
logsqlStreamFieldValuesRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_field_values"}`)
logsqlStreamIDsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/stream_ids"}`)
logsqlStreamsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/streams"}`)
logsqlTailRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/logsql/tail"}`)
)

View file

@ -1,13 +1,13 @@
{
"files": {
"main.css": "./static/css/main.1041c3d4.css",
"main.js": "./static/js/main.e54f9531.js",
"main.js": "./static/js/main.8988988c.js",
"static/js/685.bebe1265.chunk.js": "./static/js/685.bebe1265.chunk.js",
"static/media/MetricsQL.md": "./static/media/MetricsQL.cb83d071da309a358bc0.md",
"static/media/MetricsQL.md": "./static/media/MetricsQL.aaabf95f2c9bf356bde4.md",
"index.html": "./index.html"
},
"entrypoints": [
"static/css/main.1041c3d4.css",
"static/js/main.e54f9531.js"
"static/js/main.8988988c.js"
]
}

View file

@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.e54f9531.js"></script><link href="./static/css/main.1041c3d4.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.8988988c.js"></script><link href="./static/css/main.1041c3d4.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

File diff suppressed because one or more lines are too long

View file

@ -5,7 +5,7 @@
*/
/**
* @remix-run/router v1.15.1
* @remix-run/router v1.17.0
*
* Copyright (c) Remix Software Inc.
*
@ -16,7 +16,7 @@
*/
/**
* React Router DOM v6.22.1
* React Router DOM v6.24.0
*
* Copyright (c) Remix Software Inc.
*
@ -27,7 +27,7 @@
*/
/**
* React Router v6.22.1
* React Router v6.24.0
*
* Copyright (c) Remix Software Inc.
*

File diff suppressed because one or more lines are too long

View file

@ -107,7 +107,7 @@ The list of MetricsQL features on top of PromQL:
* Trailing commas on all the lists are allowed - label filters, function args and with expressions.
For instance, the following queries are valid: `m{foo="bar",}`, `f(a, b,)`, `WITH (x=y,) x`.
This simplifies maintenance of multi-line queries.
* Metric names and label names may contain any unicode letter. For example `температура{город="Київ"}` is a value MetricsQL expression.
* Metric names and label names may contain any unicode letter. For example `температура{город="Київ"}` is a valid MetricsQL expression.
* Metric names and labels names may contain escaped chars. For example, `foo\-bar{baz\=aa="b"}` is valid expression.
It returns time series with name `foo-bar` containing label `baz=aa` with value `b`.
Additionally, the following escape sequences are supported:

View file

@ -145,6 +145,13 @@ func GetStreams(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstor
return strg.GetStreams(ctx, tenantIDs, q, limit)
}
// GetStreamIDs executes q and returns streamIDs seen in query results.
//
// If limit > 0, then up to limit unique streamIDs are returned.
func GetStreamIDs(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit uint64) ([]logstorage.ValueWithHits, error) {
return strg.GetStreamIDs(ctx, tenantIDs, q, limit)
}
func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)

View file

@ -138,7 +138,7 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
timerpool.Put(t)
remoteAddr := httpserver.GetQuotedRemoteAddr(r)
requestURI := httpserver.GetRequestURI(r)
logger.Infof("client has cancelled the request after %.3f seconds: remoteAddr=%s, requestURI: %q",
logger.Infof("client has canceled the request after %.3f seconds: remoteAddr=%s, requestURI: %q",
time.Since(startTime).Seconds(), remoteAddr, requestURI)
return true
case <-t.C:

View file

@ -15,7 +15,7 @@ import (
var (
maxExportDuration = flag.Duration("search.maxExportDuration", time.Hour*24*30, "The maximum duration for /api/v1/export call")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution")
maxQueryDuration = flag.Duration("search.maxQueryDuration", time.Second*30, "The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg")
maxStatusRequestDuration = flag.Duration("search.maxStatusRequestDuration", time.Minute*5, "The maximum duration for /api/v1/status/* requests")
maxLabelsAPIDuration = flag.Duration("search.maxLabelsAPIDuration", time.Second*5, "The maximum duration for /api/v1/labels, /api/v1/label/.../values and /api/v1/series requests. "+
"See also -search.maxLabelsAPISeries and -search.ignoreExtraFiltersAtLabelsAPI")

View file

@ -40,7 +40,7 @@ services:
# storing logs and serving read queries.
victorialogs:
container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
command:
- "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-filebeat-docker-vl:/vlogs
ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-filebeat-syslog-vl:/vlogs
ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-fluentbit-vl:/vlogs
ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-logstash-vl:/vlogs
ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140"
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-promtail-docker:/vlogs
ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- victorialogs-vector-docker-vl:/vlogs
ports:

View file

@ -3,7 +3,7 @@ version: '3'
services:
# Run `make package-victoria-logs` to build victoria-logs image
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
volumes:
- vlogs:/vlogs
ports:

View file

@ -1532,7 +1532,7 @@ Below is the output for `/path/to/vmselect -help`:
-search.maxPointsSubqueryPerTimeseries int
The maximum number of points per series, which can be generated by subquery. See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3 (default 100000)
-search.maxQueryDuration duration
The maximum duration for query execution (default 30s)
The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s)
-search.maxQueryLen size
The maximum search query length in bytes
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)

View file

@ -3187,7 +3187,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-search.maxPointsSubqueryPerTimeseries int
The maximum number of points per series, which can be generated by subquery. See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3 (default 100000)
-search.maxQueryDuration duration
The maximum duration for query execution (default 30s)
The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s)
-search.maxQueryLen size
The maximum search query length in bytes
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)

View file

@ -3195,7 +3195,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
-search.maxPointsSubqueryPerTimeseries int
The maximum number of points per series, which can be generated by subquery. See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3 (default 100000)
-search.maxQueryDuration duration
The maximum duration for query execution (default 30s)
The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s)
-search.maxQueryLen size
The maximum search query length in bytes
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 16384)

View file

@ -19,8 +19,15 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.24.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.24.0-victorialogs)
Released at 2024-06-27
* FEATURE: add `/select/logsql/tail` HTTP endpoint, which can be used for live tailing of [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/) results. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) for details.
* FEATURE: add `/select/logsql/stream_ids` HTTP endpoint, which can be used for returning [`_stream_id` values](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the number of hits for the given [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/). See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#querying-stream_ids) for details.
* FEATURE: add `-retention.maxDiskSpaceUsageBytes` command-line flag, which allows limiting disk space usage for [VictoriaLogs data](https://docs.victoriametrics.com/victorialogs/#storage) by automatic dropping the oldest per-day partitions if the storage disk space usage becomes bigger than the `-retention.maxDiskSpaceUsageBytes`. See [these docs](https://docs.victoriametrics.com/victorialogs/#retention-by-disk-space-usage).
* BUGFIX: properly take into account query timeout specified via `-search.maxQueryDuration` command-line flag and/or via `timeout` query arg. Previously these timeouts could be ingored during query execution.
* BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix the update of the relative time range when `Execute Query` is clicked. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6345).
## [v0.23.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.23.0-victorialogs)

View file

@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.23.0-victorialogs/victoria-logs-linux-amd64-v0.23.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.23.0-victorialogs.tar.gz
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.24.0-victorialogs/victoria-logs-linux-amd64-v0.24.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.24.0-victorialogs.tar.gz
./victoria-logs-prod
```
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.23.0-victorialogs
docker.io/victoriametrics/victoria-logs:v0.24.0-victorialogs
```
See also:

View file

@ -15,9 +15,9 @@ VictoriaLogs provides the following features:
- VictoriaLogs can accept logs from popular log collectors. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/).
- VictoriaLogs is much easier to set up and operate compared to Elasticsearch and Grafana Loki.
See [these docs](https://docs.victoriametrics.com/victorialogs/quickstart/).
- VictoriaLogs provides easy yet powerful query language with full-text search capabilities across
all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) -
see [LogsQL docs](https://docs.victoriametrics.com/victorialogs/logsql/).
- VictoriaLogs provides easy yet powerful query language with full-text search across
all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
See [LogsQL docs](https://docs.victoriametrics.com/victorialogs/logsql/).
- VictoriaLogs can be seamlessly combined with good old Unix tools for log analysis such as `grep`, `less`, `sort`, `jq`, etc.
See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line) for details.
- VictoriaLogs capacity and performance scales linearly with the available resources (CPU, RAM, disk IO, disk space).
@ -28,7 +28,8 @@ VictoriaLogs provides the following features:
such as `trace_id`, `user_id` and `ip`.
- VictoriaLogs supports multitenancy - see [these docs](#multitenancy).
- VictoriaLogs supports out-of-order logs' ingestion aka backfilling.
- VictoriaLogs provides a simple web UI for querying logs - see [these docs](https://docs.victoriametrics.com/victorialogs/querying/#web-ui).
- VictoriaLogs supports live tailing for newly ingested logs. See [these docs](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing).
- VictoriaLogs provides web UI for querying logs - see [these docs](https://docs.victoriametrics.com/victorialogs/querying/#web-ui).
VictoriaLogs is at the Preview stage now. It is ready for evaluation in production and verifying the claims given above.
It isn't recommended to migrate from existing logging solutions to VictoriaLogs Preview in general cases yet.
@ -300,7 +301,7 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
-search.maxConcurrentRequests int
The maximum number of concurrent search requests. It shouldn't be high, since a single request can saturate all the CPU cores, while many concurrently executed requests may require high amounts of memory. See also -search.maxQueueDuration (default 16)
-search.maxQueryDuration duration
The maximum duration for query execution (default 30s)
The maximum duration for query execution. It can be overridden on a per-query basis via 'timeout' query arg (default 30s)
-search.maxQueueDuration duration
The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s)
-storage.minFreeDiskSpaceBytes size

View file

@ -35,7 +35,6 @@ The following functionality is planned in the future versions of VictoriaLogs:
- Journald (systemd)
- Add missing functionality to [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/):
- [Stream context](https://docs.victoriametrics.com/victorialogs/logsql/#stream-context).
- Live tailing for [LogsQL filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) aka `tail -f`.
- Web UI with the following abilities:
- Explore the ingested logs ([partially done](https://docs.victoriametrics.com/victorialogs/querying/#web-ui)).
- Build graphs over time for the ingested logs via [hits HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-hits-stats).

View file

@ -61,6 +61,13 @@ which parses syslog timestamps in `rfc3164` using `Europe/Berlin` timezone:
./victoria-logs -syslog.listenAddr.tcp=:514 -syslog.timezone='Europe/Berlin'
```
The ingested logs can be queried via [logs querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api). For example, the following command
returns ingested logs for the last 5 minutes by using [time filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter):
```sh
curl http://localhost:9428/select/logsql/query -d 'query=_time:5m'
```
See also:
- [Security](#security)

View file

@ -25,11 +25,13 @@ via the following ways:
VictoriaLogs provides the following HTTP endpoints:
- [`/select/logsql/query`](#querying-logs) for querying logs
- [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range
- [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
- [`/select/logsql/stream_field_names`](#querying-stream-field-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names
- [`/select/logsql/stream_field_values`](#querying-stream-field-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values
- [`/select/logsql/query`](#querying-logs) for querying logs.
- [`/select/logsql/tail`](#live-tailing) for live tailing of query results.
- [`/select/logsql/hits`](#querying-hits-stats) for querying log hits stats over the given time range.
- [`/select/logsql/stream_ids`](#querying-stream_ids) for querying `_stream_id` values of [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
- [`/select/logsql/streams`](#querying-streams) for querying [log streams](#https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
- [`/select/logsql/stream_field_names`](#querying-stream-field-names) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field names.
- [`/select/logsql/stream_field_values`](#querying-stream-field-values) for querying [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) field values.
- [`/select/logsql/field_names`](#querying-field-names) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) names.
- [`/select/logsql/field_values`](#querying-field-values) for querying [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values.
@ -95,6 +97,14 @@ Query results can be sorted in the following ways:
- By adding [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) to the query.
- By using Unix `sort` command at client side according to [these docs](#command-line).
The maximum query execution time is limited by `-search.maxQueryDuration` command-line flag value. This limit can be overridden to smaller values
on a per-query basis by passing the needed timeout via `timeout` query arg. For example, the following command limits query execution time
to 4.2 seconds:
```sh
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'timeout=4.2s'
```
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query searches
for log messages at `(AccountID=12, ProjectID=34)` tenant:
@ -116,9 +126,46 @@ See also:
- [Querying field values](#querying-field-values)
### Live tailing
VictoriaLogs provides `/select/logsql/tail?query=<query>` HTTP endpoint, which returns live tailing results for the given [`<query>`](https://docs.victoriametrics.com/victorialogs/logsql/),
e.g. it works in the way similar to `tail -f` unix command. For example, the following command returns live tailing logs with the `error` word:
```sh
curl http://localhost:9428/select/logsql/tail -d 'query=error'
```
The `<query>` must conform the following restrictions:
- It cannot contain [pipes](https://docs.victoriametrics.com/victorialogs/logsql/#pipes), which modify the number of returned results
or the order of the returned results, such as [`stats`](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe),
[`limit`](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe), [`sort`](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe),
[`uniq`](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe), [`top`](https://docs.victoriametrics.com/victorialogs/logsql/#top-pipe),
[`unroll`](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe), etc. pipes.
- It must return [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_stream_id`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
fields, e.g. these fields must be left when using [`fields`](https://docs.victoriametrics.com/victorialogs/logsql/#fields-pipe),
[`delete`](https://docs.victoriametrics.com/victorialogs/logsql/#delete-pipe) or [`rename`](https://docs.victoriametrics.com/victorialogs/logsql/#rename-pipe) pipes.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query performs live tailing
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/tail -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=error'
```
The number of currently executed live tailing requests to `/select/logsql/tail` can be [monitored](https://docs.victoriametrics.com/victorialogs/#monitoring)
with `vl_live_tailing_requests` metric.
See also:
- [Querying logs](#querying-logs)
- [Querying streams](#querying-streams)
### Querying hits stats
VictoriaMetrics provides `/select/logsql/hits?query=<query>&start=<start>&end=<end>&step=<step>` HTTP endpoint, which returns the number
VictoriaLogs provides `/select/logsql/hits?query=<query>&start=<start>&end=<end>&step=<step>` HTTP endpoint, which returns the number
of matching log entries for the given [`<query>`](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]`
time range grouped by `<step>` buckets. The returned results are sorted by time.
@ -213,17 +260,83 @@ The grouped fields are put inside `"fields"` object:
}
```
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns hits stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/hits -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=error'
```
See also:
- [Querying logs](#querying-logs)
- [Querying streams](#querying-streams)
- [HTTP API](#http-api)
### Querying stream_ids
VictoriaLogs provides `/select/logsql/stream_ids?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns `_stream_id` values
for the [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) from results
of the given [`<query>`](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every `_stream_id`.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
If `<end>` is missing, then it equals to the maximum timestamp across logs stored in VictoriaLogs.
For example, the following command returns `_stream_id` values across logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
for the last 5 minutes:
```sh
curl http://localhost:9428/select/logsql/stream_ids -d 'query=error' -d 'start=5m'
```
Below is an example JSON output returned from this endpoint:
```json
{
"values": [
{
"value": "0000000000000000106955b1744a71b78bd3a88c755751e8",
"hits": 442953
},
{
"value": "0000000000000000b80988e6012df3520a8e20cd5353c52b",
"hits": 59349
},
{
"value": "0000000000000000f8d02151e40a6cbbb1edb2050ea910ba",
"hits": 59277
}
]
}
```
The `/select/logsql/stream_ids` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned `_stream_id` values to `N`.
The endpoint returns arbitrary subset of `_stream_id` values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of `_stream_id` values.
When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns `_stream_id` stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/stream_ids -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
- [Querying streams](#querying-streams)
- [Querying logs](#querying-logs)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)
### Querying streams
VictoriaLogs provides `/select/logsql/streams?query=<query>&start=<start>&end=<end>` HTTP endpoint, which returns [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
from results of the given [`<query>`](https://docs.victoriametrics.com/victorialogs/logsql/) on the given `[<start> ... <end>]` time range.
The response also contains the number of log results per every `stream`.
The response also contains the number of log results per every `_stream`.
The `<start>` and `<end>` args can contain values in [any supported format](https://docs.victoriametrics.com/#timestamp-formats).
If `<start>` is missing, then it equals to the minimum timestamp across logs stored in VictoriaLogs.
@ -258,10 +371,20 @@ Below is an example JSON output returned from this endpoint:
```
The `/select/logsql/streams` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned streams to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of streams.
The endpoint returns arbitrary subset of streams if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of streams.
When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns stream stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/streams -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
- [Querying stream_ids](#querying-stream_ids)
- [Querying logs](#querying-logs)
- [Querying hits stats](#querying-hits-stats)
- [HTTP API](#http-api)
@ -305,6 +428,14 @@ Below is an example JSON output returned from this endpoint:
}
```
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns stream field names stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/stream_field_names -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
- [Querying stream field names](#querying-stream-field-names)
@ -349,6 +480,15 @@ Below is an example JSON output returned from this endpoint:
The `/select/logsql/stream_field_names` endpoint supports optional `limit=N` query arg, which allows limiting the number of returned values to `N`.
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values.
When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns stream field values stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/stream_field_values -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
@ -395,6 +535,14 @@ Below is an example JSON output returned from this endpoint:
}
```
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns field names stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/field_names -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
- [Querying stream field names](#querying-stream-field-names)
@ -445,6 +593,14 @@ The `/select/logsql/field_names` endpoint supports optional `limit=N` query arg,
The endpoint returns arbitrary subset of values if their number exceeds `N`, so `limit=N` cannot be used for pagination over big number of field values.
When the `limit` is reached, `hits` are zeroed, since they cannot be calculated reliably.
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query returns field values stats
for `(AccountID=12, ProjectID=34)` tenant:
```sh
curl http://localhost:9428/select/logsql/field_values -H 'AccountID: 12' -H 'ProjectID: 34' -d 'query=_time:5m'
```
See also:
- [Querying stream field values](#querying-stream-field-values)
@ -455,19 +611,16 @@ See also:
## Web UI
VictoriaLogs provides a simple Web UI for logs [querying](https://docs.victoriametrics.com/victorialogs/logsql/) and exploration
at `http://localhost:9428/select/vmui`. The UI allows exploring query results:
<img src="vmui.webp" />
VictoriaLogs provides Web UI for logs [querying](https://docs.victoriametrics.com/victorialogs/logsql/) and exploration
at `http://localhost:9428/select/vmui`.
There are three modes of displaying query results:
- `Group` - results are displayed as a table with rows grouped by stream and fields for filtering.
- `Group` - results are displayed as a table with rows grouped by [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
- `Table` - displays query results as a table.
- `JSON` - displays raw JSON response from [HTTP API](#http-api).
- `JSON` - displays raw JSON response from [`/select/logsql/query` HTTP API](#querying-logs).
This is the first version that has minimal functionality and may contain bugs.
It is recommended trying [command line interface](#command-line), which has no known bugs :)
See also [command line interface](#command-line).
## Command-line

View file

@ -234,6 +234,16 @@ func (q *Query) String() string {
return s
}
// CanLiveTail returns true if q can be used in live tailing
func (q *Query) CanLiveTail() bool {
for _, p := range q.pipes {
if !p.canLiveTail() {
return false
}
}
return true
}
func (q *Query) getStreamIDs() []streamID {
switch t := q.f.(type) {
case *filterAnd:

View file

@ -2034,3 +2034,46 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
f("* | field_values x", false)
}
func TestQueryCanLiveTail(t *testing.T) {
f := func(qStr string, resultExpected bool) {
t.Helper()
q, err := ParseQuery(qStr)
if err != nil {
t.Fatalf("cannot parse [%s]: %s", qStr, err)
}
result := q.CanLiveTail()
if result != resultExpected {
t.Fatalf("unexpected result for CanLiveTail(%q); got %v; want %v", qStr, result, resultExpected)
}
}
f("foo", true)
f("* | copy a b", true)
f("* | rm a, b", true)
f("* | drop_empty_fields", true)
f("* | extract 'foo<bar>baz'", true)
f("* | extract_regexp 'foo(?P<bar>baz)'", true)
f("* | field_names a", false)
f("* | fields a, b", true)
f("* | field_values a", false)
f("* | filter foo", true)
f("* | format 'a<b>c'", true)
f("* | limit 10", false)
f("* | math a/b as c", true)
f("* | offset 10", false)
f("* | pack_json", true)
f("* | pack_logfmt", true)
f("* | rename a b", true)
f("* | replace ('foo', 'bar')", true)
f("* | replace_regexp ('foo', 'bar')", true)
f("* | sort by (a)", false)
f("* | stats count() rows", false)
f("* | top 10 by (x)", false)
f("* | uniq by (a)", false)
f("* | unpack_json", true)
f("* | unpack_logfmt", true)
f("* | unpack_syslog", true)
f("* | unroll by (a)", false)
}

View file

@ -8,6 +8,11 @@ type pipe interface {
// String returns string representation of the pipe.
String() string
// canLiveTail must return true if the given pipe can be used in live tailing
//
// See https://docs.victoriametrics.com/victorialogs/querying/#live-tailing
canLiveTail() bool
// updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input.
updateNeededFields(neededFields, unneededFields fieldsSet)

View file

@ -31,6 +31,10 @@ func (pc *pipeCopy) String() string {
return "copy " + strings.Join(a, ", ")
}
func (pc *pipeCopy) canLiveTail() bool {
return true
}
func (pc *pipeCopy) updateNeededFields(neededFields, unneededFields fieldsSet) {
for i := len(pc.srcFields) - 1; i >= 0; i-- {
srcField := pc.srcFields[i]

View file

@ -22,6 +22,10 @@ func (pd *pipeDelete) String() string {
return "delete " + fieldNamesString(pd.fields)
}
func (pd *pipeDelete) canLiveTail() bool {
return true
}
func (pd *pipeDelete) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFields.addFields(pd.fields)

View file

@ -17,6 +17,10 @@ func (pd *pipeDropEmptyFields) String() string {
return "drop_empty_fields"
}
func (pd *pipeDropEmptyFields) canLiveTail() bool {
return true
}
func (pd *pipeDropEmptyFields) optimize() {
// nothing to do
}

View file

@ -41,6 +41,10 @@ func (pe *pipeExtract) String() string {
return s
}
func (pe *pipeExtract) canLiveTail() bool {
return true
}
func (pe *pipeExtract) optimize() {
pe.iff.optimizeFilterIn()
}

View file

@ -43,6 +43,10 @@ func (pe *pipeExtractRegexp) String() string {
return s
}
func (pe *pipeExtractRegexp) canLiveTail() bool {
return true
}
func (pe *pipeExtractRegexp) optimize() {
pe.iff.optimizeFilterIn()
}

View file

@ -28,6 +28,10 @@ func (pf *pipeFieldNames) String() string {
return s
}
func (pf *pipeFieldNames) canLiveTail() bool {
return false
}
func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add("*")
unneededFields.reset()

View file

@ -21,6 +21,10 @@ func (pf *pipeFieldValues) String() string {
return s
}
func (pf *pipeFieldValues) canLiveTail() bool {
return false
}
func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
neededFields.add(pf.field)

View file

@ -25,6 +25,10 @@ func (pf *pipeFields) String() string {
return "fields " + fieldNamesString(pf.fields)
}
func (pf *pipeFields) canLiveTail() bool {
return true
}
func (pf *pipeFields) updateNeededFields(neededFields, unneededFields fieldsSet) {
if pf.containsStar {
return

View file

@ -17,6 +17,10 @@ func (pf *pipeFilter) String() string {
return "filter " + pf.f.String()
}
func (pf *pipeFilter) canLiveTail() bool {
return true
}
func (pf *pipeFilter) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
fs := newFieldsSet()

View file

@ -43,6 +43,10 @@ func (pf *pipeFormat) String() string {
return s
}
func (pf *pipeFormat) canLiveTail() bool {
return true
}
func (pf *pipeFormat) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
if pf.iff != nil {

View file

@ -16,6 +16,10 @@ func (pl *pipeLimit) String() string {
return fmt.Sprintf("limit %d", pl.limit)
}
func (pl *pipeLimit) canLiveTail() bool {
return false
}
func (pl *pipeLimit) updateNeededFields(_, _ fieldsSet) {
// nothing to do
}

View file

@ -66,6 +66,10 @@ func (pm *pipeMath) String() string {
return s
}
func (pm *pipeMath) canLiveTail() bool {
return true
}
func (me *mathEntry) String() string {
s := me.expr.String()
if isMathBinaryOp(me.expr.op) {

View file

@ -16,6 +16,10 @@ func (po *pipeOffset) String() string {
return fmt.Sprintf("offset %d", po.offset)
}
func (po *pipeOffset) canLiveTail() bool {
return false
}
func (po *pipeOffset) updateNeededFields(_, _ fieldsSet) {
// nothing to do
}

View file

@ -25,6 +25,10 @@ func (pp *pipePackJSON) String() string {
return s
}
func (pp *pipePackJSON) canLiveTail() bool {
return true
}
func (pp *pipePackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
}

View file

@ -25,6 +25,10 @@ func (pp *pipePackLogfmt) String() string {
return s
}
func (pp *pipePackLogfmt) canLiveTail() bool {
return true
}
func (pp *pipePackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForPipePack(neededFields, unneededFields, pp.resultField, pp.fields)
}

View file

@ -31,6 +31,10 @@ func (pr *pipeRename) String() string {
return "rename " + strings.Join(a, ", ")
}
func (pr *pipeRename) canLiveTail() bool {
return true
}
func (pr *pipeRename) updateNeededFields(neededFields, unneededFields fieldsSet) {
for i := len(pr.srcFields) - 1; i >= 0; i-- {
srcField := pr.srcFields[i]

View file

@ -37,6 +37,10 @@ func (pr *pipeReplace) String() string {
return s
}
func (pr *pipeReplace) canLiveTail() bool {
return true
}
func (pr *pipeReplace) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUpdatePipe(neededFields, unneededFields, pr.field, pr.iff)
}

View file

@ -37,6 +37,10 @@ func (pr *pipeReplaceRegexp) String() string {
return s
}
func (pr *pipeReplaceRegexp) canLiveTail() bool {
return true
}
func (pr *pipeReplaceRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUpdatePipe(neededFields, unneededFields, pr.field, pr.iff)
}

View file

@ -55,6 +55,10 @@ func (ps *pipeSort) String() string {
return s
}
func (ps *pipeSort) canLiveTail() bool {
return false
}
func (ps *pipeSort) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.isEmpty() {
return

View file

@ -95,6 +95,10 @@ func (ps *pipeStats) String() string {
return s
}
func (ps *pipeStats) canLiveTail() bool {
return false
}
func (ps *pipeStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFieldsOrig := neededFields.clone()
neededFields.reset()

View file

@ -45,6 +45,10 @@ func (pt *pipeTop) String() string {
return s
}
func (pt *pipeTop) canLiveTail() bool {
return false
}
func (pt *pipeTop) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.reset()
unneededFields.reset()

View file

@ -40,6 +40,10 @@ func (pu *pipeUniq) String() string {
return s
}
func (pu *pipeUniq) canLiveTail() bool {
return false
}
func (pu *pipeUniq) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.reset()
unneededFields.reset()

View file

@ -52,6 +52,10 @@ func (pu *pipeUnpackJSON) String() string {
return s
}
func (pu *pipeUnpackJSON) canLiveTail() bool {
return true
}
func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields)
}

View file

@ -50,6 +50,10 @@ func (pu *pipeUnpackLogfmt) String() string {
return s
}
func (pu *pipeUnpackLogfmt) canLiveTail() bool {
return true
}
func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, pu.fields, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff, neededFields, unneededFields)
}

View file

@ -46,6 +46,10 @@ func (pu *pipeUnpackSyslog) String() string {
return s
}
func (pu *pipeUnpackSyslog) canLiveTail() bool {
return true
}
func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) {
updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields)
}

View file

@ -32,6 +32,10 @@ func (pu *pipeUnroll) String() string {
return s
}
func (pu *pipeUnroll) canLiveTail() bool {
return false
}
func (pu *pipeUnroll) optimize() {
pu.iff.optimizeFilterIn()
}

View file

@ -357,6 +357,13 @@ func (s *Storage) GetStreams(ctx context.Context, tenantIDs []TenantID, q *Query
return s.GetFieldValues(ctx, tenantIDs, q, "_stream", limit)
}
// GetStreamIDs returns stream_id field values from q results for the given tenantIDs.
//
// If limit > 0, then up to limit unique streams are returned.
func (s *Storage) GetStreamIDs(ctx context.Context, tenantIDs []TenantID, q *Query, limit uint64) ([]ValueWithHits, error) {
return s.GetFieldValues(ctx, tenantIDs, q, "_stream_id", limit)
}
func (s *Storage) runValuesWithHitsQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex

View file

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"sync"
"sync/atomic"
@ -316,12 +317,12 @@ func TestStorageRunQuery(t *testing.T) {
})
t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{"_msg", 1155},
{"_stream", 1155},
{"_stream_id", 1155},
@ -332,18 +333,18 @@ func TestStorageRunQuery(t *testing.T) {
{"stream-id", 1155},
{"tenant.id", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_names-some", func(t *testing.T) {
q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
results, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{"_msg", 385},
{"_stream", 385},
{"_stream_id", 385},
@ -354,85 +355,85 @@ func TestStorageRunQuery(t *testing.T) {
{"stream-id", 385},
{"tenant.id", 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 0},
{`{instance="host-1:234",job="foobar"}`, 0},
{`{instance="host-2:234",job="foobar"}`, 0},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("field_values-limit", func(t *testing.T) {
q := mustParseQuery("instance:='host-1:234'")
values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
results, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`{instance="host-1:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_names", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
results, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{"instance", 1155},
{"job", 1155},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_values-nolimit", func(t *testing.T) {
q := mustParseQuery("*")
values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
results, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_field_values-limit", func(t *testing.T) {
@ -442,29 +443,53 @@ func TestStorageRunQuery(t *testing.T) {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`host-0:234`, 385},
{`host-1:234`, 385},
{`host-2:234`, 385},
}
if !reflect.DeepEqual(values, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
if !reflect.DeepEqual(values, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultsExpected)
}
})
t.Run("streams", func(t *testing.T) {
q := mustParseQuery("*")
names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
results, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
resultExpected := []ValueWithHits{
resultsExpected := []ValueWithHits{
{`{instance="host-0:234",job="foobar"}`, 385},
{`{instance="host-1:234",job="foobar"}`, 385},
{`{instance="host-2:234",job="foobar"}`, 385},
}
if !reflect.DeepEqual(names, resultExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})
t.Run("stream_ids", func(t *testing.T) {
q := mustParseQuery("*")
results, err := s.GetStreamIDs(context.Background(), allTenantIDs, q, 0)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
// Verify the first 5 results with the smallest _stream_id value.
sort.Slice(results, func(i, j int) bool {
return results[i].Value < results[j].Value
})
results = results[:5]
resultsExpected := []ValueWithHits{
{"000000000000000140c1914be0226f8185f5b00551fb3b2d", 35},
{"000000000000000177edafcd46385c778b57476eb5b92233", 35},
{"0000000000000001f5b4cae620b5e85d6ef5f2107fe00274", 35},
{"000000010000000b40c1914be0226f8185f5b00551fb3b2d", 35},
{"000000010000000b77edafcd46385c778b57476eb5b92233", 35},
}
if !reflect.DeepEqual(results, resultsExpected) {
t.Fatalf("unexpected result; got\n%v\nwant\n%v", results, resultsExpected)
}
})