mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
2270c42c82
commit
16a91539bd
4 changed files with 93 additions and 4 deletions
|
@ -22,7 +22,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
* FEATURE: return all the log fields by default in query results. Previously only [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) and [`_msg`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) fields were returned by default.
|
||||
* FEATURE: add support for returning only the requested log [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#querying-specific-fields).
|
||||
* FEATURE: add support for calculating the number of matching logs and the number of logs with non-empty [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). Grouping by arbitrary set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) is supported. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#stats) for details.
|
||||
* FEATURE: add support for returning the first `N` results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
|
||||
* FEATURE: add support for limiting the number of returned results. See [these docs](https://docs.victoriametrics.com/victorialogs/logsql/#limiters).
|
||||
* FEATURE: optimize performance for [LogsQL query](https://docs.victoriametrics.com/victorialogs/logsql/), which contains multiple filters for [words](https://docs.victoriametrics.com/victorialogs/logsql/#word-filter) or [phrases](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) delimited with [`AND` operator](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, `foo AND bar` query must find [log messages](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) with `foo` and `bar` words at faster speed.
|
||||
|
||||
* BUGFIX: prevent from additional CPU usage for up to a few seconds after canceling the query.
|
||||
|
|
|
@ -1092,11 +1092,13 @@ See the [Roadmap](https://docs.victoriametrics.com/VictoriaLogs/Roadmap.html) fo
|
|||
|
||||
## Limiters
|
||||
|
||||
LogsQL provides the following functionality for limiting the amounts of returned log entries:
|
||||
LogsQL provides the following functionality for limiting the number of returned log entries:
|
||||
|
||||
- `error | head 10` - returns up to 10 log entries with the `error` [word](#word).
|
||||
- `error | skip 10` - skips the first 10 log entris with the `error` [word](#word).
|
||||
|
||||
LogsQL will support the ability to page the returned results.
|
||||
It is recommended [sorting](#sorting) entries before limiting the number of returned log entries,
|
||||
in order to get consistent results.
|
||||
|
||||
It is possible to limit the returned results with `head`, `tail`, `less`, etc. Unix commands
|
||||
according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying/#command-line).
|
||||
|
|
|
@ -820,6 +820,12 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
// multiple head pipes
|
||||
f(`foo | head 100 | head 10 | head 234`, `foo | head 100 | head 10 | head 234`)
|
||||
|
||||
// skip pipe
|
||||
f(`foo | skip 10`, `foo | skip 10`)
|
||||
|
||||
// multiple skip pipes
|
||||
f(`foo | skip 10 | skip 100`, `foo | skip 10 | skip 100`)
|
||||
|
||||
// stats count pipe
|
||||
f(`* | Stats count() AS foo`, `* | stats count() as foo`)
|
||||
f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count(*) as XYz`)
|
||||
|
@ -827,6 +833,7 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
|
||||
// multiple different pipes
|
||||
f(`* | fields foo, bar | head 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | head 100 | stats by (foo, bar) count(baz) as qwert`)
|
||||
f(`* | skip 100 | head 20 | skip 10`, `* | skip 100 | head 20 | skip 10`)
|
||||
}
|
||||
|
||||
func TestParseQueryFailure(t *testing.T) {
|
||||
|
@ -1046,6 +1053,13 @@ func TestParseQueryFailure(t *testing.T) {
|
|||
f(`foo | head bar`)
|
||||
f(`foo | head -123`)
|
||||
|
||||
// missing skip pipe value
|
||||
f(`foo | skip`)
|
||||
|
||||
// invalid skip pipe value
|
||||
f(`foo | skip bar`)
|
||||
f(`foo | skip -10`)
|
||||
|
||||
// missing stats
|
||||
f(`foo | stats`)
|
||||
|
||||
|
|
|
@ -86,6 +86,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
|
|||
return nil, fmt.Errorf("cannot parse 'head' pipe: %w", err)
|
||||
}
|
||||
pipes = append(pipes, hp)
|
||||
case lex.isKeyword("skip"):
|
||||
sp, err := parseSkipPipe(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'skip' pipe: %w", err)
|
||||
}
|
||||
pipes = append(pipes, sp)
|
||||
default:
|
||||
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
|
||||
}
|
||||
|
@ -647,7 +653,7 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) {
|
|||
}
|
||||
n, err := strconv.ParseUint(lex.token, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse %q: %w", lex.token, err)
|
||||
return nil, fmt.Errorf("cannot parse the number of head rows to return %q: %w", lex.token, err)
|
||||
}
|
||||
lex.nextToken()
|
||||
hp := &headPipe{
|
||||
|
@ -656,6 +662,73 @@ func parseHeadPipe(lex *lexer) (*headPipe, error) {
|
|||
return hp, nil
|
||||
}
|
||||
|
||||
type skipPipe struct {
|
||||
n uint64
|
||||
}
|
||||
|
||||
func (sp *skipPipe) String() string {
|
||||
return fmt.Sprintf("skip %d", sp.n)
|
||||
}
|
||||
|
||||
func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
|
||||
return &skipPipeProcessor{
|
||||
sp: sp,
|
||||
cancel: cancel,
|
||||
ppBase: ppBase,
|
||||
}
|
||||
}
|
||||
|
||||
type skipPipeProcessor struct {
|
||||
sp *skipPipe
|
||||
cancel func()
|
||||
ppBase pipeProcessor
|
||||
|
||||
rowsSkipped atomic.Uint64
|
||||
}
|
||||
|
||||
func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
|
||||
rowsSkipped := spp.rowsSkipped.Add(uint64(len(timestamps)))
|
||||
if rowsSkipped <= spp.sp.n {
|
||||
return
|
||||
}
|
||||
|
||||
rowsSkipped -= uint64(len(timestamps))
|
||||
if rowsSkipped >= spp.sp.n {
|
||||
spp.ppBase.writeBlock(workerID, timestamps, columns)
|
||||
return
|
||||
}
|
||||
|
||||
rowsRemaining := spp.sp.n - rowsSkipped
|
||||
cs := make([]BlockColumn, len(columns))
|
||||
for i, c := range columns {
|
||||
cDst := &cs[i]
|
||||
cDst.Name = c.Name
|
||||
cDst.Values = c.Values[rowsRemaining:]
|
||||
}
|
||||
timestamps = timestamps[rowsRemaining:]
|
||||
spp.ppBase.writeBlock(workerID, timestamps, cs)
|
||||
}
|
||||
|
||||
func (spp *skipPipeProcessor) flush() {
|
||||
spp.cancel()
|
||||
spp.ppBase.flush()
|
||||
}
|
||||
|
||||
func parseSkipPipe(lex *lexer) (*skipPipe, error) {
|
||||
if !lex.mustNextToken() {
|
||||
return nil, fmt.Errorf("missing the number of rows to skip")
|
||||
}
|
||||
n, err := strconv.ParseUint(lex.token, 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse the number of rows to skip %q: %w", lex.token, err)
|
||||
}
|
||||
lex.nextToken()
|
||||
sp := &skipPipe{
|
||||
n: n,
|
||||
}
|
||||
return sp, nil
|
||||
}
|
||||
|
||||
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
|
||||
if !lex.isKeyword("(") {
|
||||
return nil, fmt.Errorf("missing `(`")
|
||||
|
|
Loading…
Reference in a new issue