lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-24 23:27:12 +02:00
parent 5040cfaff3
commit de7450b7e0
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
17 changed files with 320 additions and 96 deletions

View file

@ -42,7 +42,7 @@ services:
# storing logs and serving read queries. # storing logs and serving read queries.
victorialogs: victorialogs:
container_name: victorialogs container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
command: command:
- "--storageDataPath=/vlogs" - "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428" - "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066 - -beat.uri=http://filebeat-victorialogs:5066
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-docker-vl:/vlogs - victorialogs-filebeat-docker-vl:/vlogs
ports: ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-syslog-vl:/vlogs - victorialogs-filebeat-syslog-vl:/vlogs
ports: ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-fluentbit-vl:/vlogs - victorialogs-fluentbit-vl:/vlogs
ports: ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-logstash-vl:/vlogs - victorialogs-logstash-vl:/vlogs
ports: ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140" - "5140:5140"
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-promtail-docker:/vlogs - victorialogs-promtail-docker:/vlogs
ports: ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy condition: service_healthy
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- victorialogs-vector-docker-vl:/vlogs - victorialogs-vector-docker-vl:/vlogs
ports: ports:

View file

@ -3,7 +3,7 @@ version: '3'
services: services:
# Run `make package-victoria-logs` to build victoria-logs image # Run `make package-victoria-logs` to build victoria-logs image
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes: volumes:
- vlogs:/vlogs - vlogs:/vlogs
ports: ports:

View file

@ -19,6 +19,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
## [v0.22.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.22.0-victorialogs)
Released at 2024-06-24
* FEATURE: allow specifying multiple `_stream_id` values in [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter) via `_stream_id:in(id1, ..., idN)` syntax.
* FEATURE: allow specifying subquery for searching for `_stream_id` values inside [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter). For example, `_stream_id:in(_time:5m error | fields _stream_id)` returns logs for [logs streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the `error` word across logs for the last 5 minutes.
## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs) ## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs)
Released at 2024-06-20 Released at 2024-06-20

View file

@ -479,6 +479,20 @@ query selects logs for the given stream for the last hour:
_time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7 _time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
``` ```
The `_stream_id` filter supports specifying multiple `_stream_id` values via `_stream_id:in(...)` syntax. For example:
```logsql
_stream_id:in(0000007b000001c850d9950ea6196b1a4812081265faa1c7, 1230007b456701c850d9950ea6196b1a4812081265fff2a9)
```
It is also possible specifying subquery inside `in(...)`, which selects the needed `_stream_id` values. For example, the following query returns
logs for [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `error` [word](#word)
in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) during the last 5 minutes:
```logsql
_stream_id:in(_time:5m error | fields _stream_id)
```
See also: See also:
- [stream filter](#stream-filter) - [stream filter](#stream-filter)

View file

@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh ```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.21.0-victorialogs/victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.22.0-victorialogs/victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz tar xzf victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz
./victoria-logs-prod ./victoria-logs-prod
``` ```
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh ```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
``` ```
See also: See also:

View file

@ -1,27 +1,76 @@
package logstorage package logstorage
import ( import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
// filterStreamID is the filter for `_stream_id:id` // filterStreamID is the filter for `_stream_id:id`
type filterStreamID struct { type filterStreamID struct {
streamIDStr string streamIDs []streamID
// needeExecuteQuery is set to true if q must be executed for populating streamIDs before filter execution.
needExecuteQuery bool
// If q is non-nil, then streamIDs must be populated from q before filter execution.
q *Query
// qFieldName must be set to field name for obtaining values from if q is non-nil.
qFieldName string
streamIDsMap map[string]struct{}
streamIDsMapOnce sync.Once
} }
func (fs *filterStreamID) String() string { func (fs *filterStreamID) String() string {
return "_stream_id:" + quoteTokenIfNeeded(fs.streamIDStr) if fs.q != nil {
return "_stream_id:in(" + fs.q.String() + ")"
}
streamIDs := fs.streamIDs
if len(streamIDs) == 1 {
return "_stream_id:" + string(streamIDs[0].marshalString(nil))
}
a := make([]string, len(streamIDs))
for i, streamID := range streamIDs {
a[i] = string(streamID.marshalString(nil))
}
return "_stream_id:in(" + strings.Join(a, ",") + ")"
} }
func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) { func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_stream_id") neededFields.add("_stream_id")
} }
func (fs *filterStreamID) getStreamIDsMap() map[string]struct{} {
fs.streamIDsMapOnce.Do(fs.initStreamIDsMap)
return fs.streamIDsMap
}
func (fs *filterStreamID) initStreamIDsMap() {
m := make(map[string]struct{}, len(fs.streamIDs))
for _, streamID := range fs.streamIDs {
k := streamID.marshalString(nil)
m[string(k)] = struct{}{}
}
fs.streamIDsMap = m
}
func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) { func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
m := fs.getStreamIDsMap()
if len(m) == 0 {
bm.resetBits()
return
}
c := br.getColumnByName("_stream_id") c := br.getColumnByName("_stream_id")
if c.isConst { if c.isConst {
v := c.valuesEncoded[0] v := c.valuesEncoded[0]
if fs.streamIDStr != v { if _, ok := m[v]; !ok {
bm.resetBits() bm.resetBits()
} }
return return
@ -36,16 +85,18 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
values := c.getValues(br) values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool { bm.forEachSetBit(func(idx int) bool {
v := values[idx] v := values[idx]
return fs.streamIDStr == v _, ok := m[v]
return ok
}) })
case valueTypeDict: case valueTypeDict:
bb := bbPool.Get() bb := bbPool.Get()
for _, v := range c.dictValues { for _, v := range c.dictValues {
c := byte(0) ch := byte(0)
if fs.streamIDStr == v { _, ok := m[v]
c = 1 if ok {
ch = 1
} }
bb.B = append(bb.B, c) bb.B = append(bb.B, ch)
} }
valuesEncoded := c.getValuesEncoded(br) valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool { bm.forEachSetBit(func(idx int) bool {
@ -73,10 +124,17 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
} }
func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) { func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
m := fs.getStreamIDsMap()
if len(m) == 0 {
bm.resetBits()
return
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = bs.bsw.bh.streamID.marshalString(bb.B) bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
ok := fs.streamIDStr == string(bb.B) _, ok := m[string(bb.B)]
bbPool.Put(bb) bbPool.Put(bb)
if !ok { if !ok {
bm.resetBits() bm.resetBits()
return return

View file

@ -12,19 +12,37 @@ func TestFilterStreamID(t *testing.T) {
t.Parallel() t.Parallel()
// match // match
var sid1 streamID
if !sid1.tryUnmarshalFromString("0000007b000001c8302bc96e02e54e5524b3a68ec271e55e") {
t.Fatalf("cannot unmarshal _stream_id")
}
ft := &filterStreamID{ ft := &filterStreamID{
streamIDStr: "0000007b000001c8302bc96e02e54e5524b3a68ec271e55e", streamIDs: []streamID{sid1},
} }
testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9}) testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9})
var sid2 streamID
if !sid2.tryUnmarshalFromString("0000007b000001c850d9950ea6196b1a4812081265faa1c7") {
t.Fatalf("cannot unmarshal _stream_id")
}
ft = &filterStreamID{ ft = &filterStreamID{
streamIDStr: "0000007b000001c850d9950ea6196b1a4812081265faa1c7", streamIDs: []streamID{sid2},
} }
testFilterMatchForStreamID(t, ft, []int{1, 4, 7}) testFilterMatchForStreamID(t, ft, []int{1, 4, 7})
ft = &filterStreamID{
streamIDs: []streamID{sid1, sid2},
}
testFilterMatchForStreamID(t, ft, []int{0, 1, 3, 4, 6, 7, 9})
// mismatch // mismatch
ft = &filterStreamID{ ft = &filterStreamID{
streamIDStr: "abc", streamIDs: nil,
}
testFilterMatchForStreamID(t, ft, nil)
ft = &filterStreamID{
streamIDs: []streamID{{}},
} }
testFilterMatchForStreamID(t, ft, nil) testFilterMatchForStreamID(t, ft, nil)
} }

View file

@ -3,7 +3,6 @@ package logstorage
import ( import (
"fmt" "fmt"
"math" "math"
"sort"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -235,46 +234,38 @@ func (q *Query) String() string {
return s return s
} }
func (q *Query) getSortedStreamIDs() []streamID { func (q *Query) getStreamIDs() []streamID {
switch t := q.f.(type) { switch t := q.f.(type) {
case *filterAnd: case *filterAnd:
for _, f := range t.filters { for _, f := range t.filters {
streamIDs, ok := getSortedStreamIDsFromFilterOr(f) streamIDs, ok := getStreamIDsFromFilterOr(f)
if ok { if ok {
return streamIDs return streamIDs
} }
} }
return nil return nil
default: default:
streamIDs, _ := getSortedStreamIDsFromFilterOr(q.f) streamIDs, _ := getStreamIDsFromFilterOr(q.f)
return streamIDs return streamIDs
} }
} }
func getSortedStreamIDsFromFilterOr(f filter) ([]streamID, bool) { func getStreamIDsFromFilterOr(f filter) ([]streamID, bool) {
switch t := f.(type) { switch t := f.(type) {
case *filterOr: case *filterOr:
streamIDsFilters := 0
var streamIDs []streamID var streamIDs []streamID
for _, f := range t.filters { for _, f := range t.filters {
fs, ok := f.(*filterStreamID) fs, ok := f.(*filterStreamID)
if !ok { if !ok {
return nil, false return nil, false
} }
var sid streamID streamIDsFilters++
if sid.tryUnmarshalFromString(fs.streamIDStr) { streamIDs = append(streamIDs, fs.streamIDs...)
streamIDs = append(streamIDs, sid)
}
} }
sort.Slice(streamIDs, func(i, j int) bool { return streamIDs, streamIDsFilters > 0
return streamIDs[i].less(&streamIDs[j])
})
return streamIDs, len(streamIDs) > 0
case *filterStreamID: case *filterStreamID:
var sid streamID return t.streamIDs, true
if !sid.tryUnmarshalFromString(t.streamIDStr) {
return nil, true
}
return []streamID{sid}, true
default: default:
return nil, false return nil, false
} }
@ -1830,17 +1821,89 @@ func stripTimezoneSuffix(s string) string {
return s[:len(s)-len(tz)] return s[:len(s)-len(tz)]
} }
func parseFilterStreamID(lex *lexer) (*filterStreamID, error) { func parseFilterStreamID(lex *lexer) (filter, error) {
s, err := getCompoundToken(lex) if lex.isKeyword("in") {
return parseFilterStreamIDIn(lex)
}
sid, err := parseStreamID(lex)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("cannot parse _stream_id: %w", err)
} }
fs := &filterStreamID{ fs := &filterStreamID{
streamIDStr: s, streamIDs: []streamID{sid},
} }
return fs, nil return fs, nil
} }
func parseFilterStreamIDIn(lex *lexer) (filter, error) {
if !lex.isKeyword("in") {
return nil, fmt.Errorf("unexpected token %q; expecting 'in'", lex.token)
}
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fs, err := parseFuncArgs(lex, "", func(args []string) (filter, error) {
streamIDs := make([]streamID, len(args))
for i, arg := range args {
if !streamIDs[i].tryUnmarshalFromString(arg) {
return nil, fmt.Errorf("cannot unmarshal _stream_id from %q", arg)
}
}
fs := &filterStreamID{
streamIDs: streamIDs,
}
return fs, nil
})
if err == nil {
return fs, nil
}
// Try parsing in(query)
lex.restoreState(lexState)
lex.nextToken()
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '(' after 'in'")
}
lex.nextToken()
q, err := parseQuery(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse query inside 'in(...)': %w", err)
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("missing ')' after 'in(%s)'", q)
}
lex.nextToken()
qFieldName, err := getFieldNameFromPipes(q.pipes)
if err != nil {
return nil, fmt.Errorf("cannot determine field name for values in 'in(%s)': %w", q, err)
}
fs = &filterStreamID{
needExecuteQuery: true,
q: q,
qFieldName: qFieldName,
}
return fs, nil
}
func parseStreamID(lex *lexer) (streamID, error) {
var sid streamID
s, err := getCompoundToken(lex)
if err != nil {
return sid, err
}
if !sid.tryUnmarshalFromString(s) {
return sid, fmt.Errorf("cannot unmarshal _stream_id from %q", s)
}
return sid, nil
}
func parseFilterStream(lex *lexer) (*filterStream, error) { func parseFilterStream(lex *lexer) (*filterStream, error) {
sf, err := parseStreamFilter(lex) sf, err := parseStreamFilter(lex)
if err != nil { if err != nil {

View file

@ -697,8 +697,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`) f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`)
// _stream_id filter // _stream_id filter
f(`_stream_id:foo`, `_stream_id:foo`) f(`_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:foo-bar/b:az`, `_stream_id:"foo-bar/b:az"`) f(`_stream_id:"0000007b000001c8302bc96e02e54e5524b3a68ec271e55e"`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:in()`, `_stream_id:in()`)
f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e)`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e, "0000007b000001c850d9950ea6196b1a4812081265faa1c7")`,
`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e,0000007b000001c850d9950ea6196b1a4812081265faa1c7)`)
f(`_stream_id:in(_time:5m | fields _stream_id)`, `_stream_id:in(_time:5m | fields _stream_id)`)
// _stream filters // _stream filters
f(`_stream:{}`, `_stream:{}`) f(`_stream:{}`, `_stream:{}`)
@ -1243,7 +1248,11 @@ func TestParseQueryFailure(t *testing.T) {
f("`foo") f("`foo")
// invalid _stream_id filters // invalid _stream_id filters
f("_stream_id:(foo)") f("_stream_id:foo")
f("_stream_id:()")
f("_stream_id:in(foo)")
f("_stream_id:in(foo | bar)")
f("_stream_id:in(* | stats by (x) count() y)")
// invalid _stream filters // invalid _stream filters
f("_stream:") f("_stream:")

View file

@ -105,7 +105,11 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
} }
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error { func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
streamIDs := q.getSortedStreamIDs() streamIDs := q.getStreamIDs()
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
neededColumnNames, unneededColumnNames := q.getNeededColumns() neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{ so := &genericSearchOptions{
tenantIDs: tenantIDs, tenantIDs: tenantIDs,
@ -427,8 +431,14 @@ func hasFilterInWithQueryForFilter(f filter) bool {
return false return false
} }
visitFunc := func(f filter) bool { visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn) switch t := f.(type) {
return ok && fi.needExecuteQuery case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
} }
return visitFilter(f, visitFunc) return visitFilter(f, visitFunc)
} }
@ -465,33 +475,71 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldVa
} }
visitFunc := func(f filter) bool { visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn) switch t := f.(type) {
return ok && fi.needExecuteQuery case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
} }
copyFunc := func(f filter) (filter, error) { copyFunc := func(f filter) (filter, error) {
fi := f.(*filterIn) switch t := f.(type) {
case *filterIn:
qStr := fi.q.String() values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
values, ok := cache[qStr]
if !ok {
vs, err := getFieldValuesFunc(fi.q, fi.qFieldName)
if err != nil { if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err) return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
} }
cache[qStr] = vs
values = vs
}
fiNew := &filterIn{ fiNew := &filterIn{
fieldName: fi.fieldName, fieldName: t.fieldName,
q: fi.q, q: t.q,
values: values, values: values,
}
return fiNew, nil
case *filterStreamID:
values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
}
// convert values to streamID list
streamIDs := make([]streamID, 0, len(values))
for _, v := range values {
var sid streamID
if sid.tryUnmarshalFromString(v) {
streamIDs = append(streamIDs, sid)
}
}
fsNew := &filterStreamID{
streamIDs: streamIDs,
q: t.q,
}
return fsNew, nil
default:
return f, nil
} }
return fiNew, nil
} }
return copyFilter(f, visitFunc, copyFunc) return copyFilter(f, visitFunc, copyFunc)
} }
func getValuesForQuery(q *Query, qFieldName string, cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) ([]string, error) {
qStr := q.String()
values, ok := cache[qStr]
if ok {
return values, nil
}
vs, err := getFieldValuesFunc(q, qFieldName)
if err != nil {
return nil, err
}
cache[qStr] = vs
return vs, nil
}
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) { func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) {
pipesNew := make([]pipe, len(pipes)) pipesNew := make([]pipe, len(pipes))
for i, p := range pipes { for i, p := range pipes {

View file

@ -82,7 +82,7 @@ func TestStorageRunQuery(t *testing.T) {
} }
s.debugFlush() s.debugFlush()
mustRunQuery := func(tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) { mustRunQuery := func(t *testing.T, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
t.Helper() t.Helper()
err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock) err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
if err != nil { if err != nil {
@ -91,7 +91,7 @@ func TestStorageRunQuery(t *testing.T) {
} }
// run tests on the storage data // run tests on the storage data
t.Run("missing-tenant", func(_ *testing.T) { t.Run("missing-tenant", func(t *testing.T) {
q := mustParseQuery(`"log message"`) q := mustParseQuery(`"log message"`)
tenantID := TenantID{ tenantID := TenantID{
AccountID: 0, AccountID: 0,
@ -101,9 +101,9 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
}) })
t.Run("missing-message-text", func(_ *testing.T) { t.Run("missing-message-text", func(t *testing.T) {
q := mustParseQuery(`foobar`) q := mustParseQuery(`foobar`)
tenantID := TenantID{ tenantID := TenantID{
AccountID: 1, AccountID: 1,
@ -113,7 +113,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
}) })
t.Run("matching-tenant-id", func(t *testing.T) { t.Run("matching-tenant-id", func(t *testing.T) {
q := mustParseQuery(`tenant.id:*`) q := mustParseQuery(`tenant.id:*`)
@ -147,7 +147,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -161,7 +161,7 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
mustRunQuery(allTenantIDs, q, writeBlock) mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -174,19 +174,19 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
mustRunQuery(allTenantIDs, q, writeBlock) mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
} }
}) })
t.Run("stream-filter-mismatch", func(_ *testing.T) { t.Run("stream-filter-mismatch", func(t *testing.T) {
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
mustRunQuery(allTenantIDs, q, writeBlock) mustRunQuery(t, allTenantIDs, q, writeBlock)
}) })
t.Run("matching-stream-id", func(t *testing.T) { t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ { for i := 0; i < streamsPerTenant; i++ {
@ -220,7 +220,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -239,7 +239,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2 expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -259,7 +259,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream expectedRowsCount := streamsPerTenant * blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -279,14 +279,14 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
} }
}) })
t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) { t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9)) q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
@ -298,9 +298,9 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
}) })
t.Run("missing-time-range", func(_ *testing.T) { t.Run("missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9 maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9)) q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
@ -312,7 +312,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock) mustRunQuery(t, tenantIDs, q, writeBlock)
}) })
t.Run("field_names-all", func(t *testing.T) { t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*") q := mustParseQuery("*")
@ -493,7 +493,7 @@ func TestStorageRunQuery(t *testing.T) {
resultRowsLock.Unlock() resultRowsLock.Unlock()
} }
} }
mustRunQuery(allTenantIDs, q, writeBlock) mustRunQuery(t, allTenantIDs, q, writeBlock)
assertRowsEqual(t, resultRows, rowsExpected) assertRowsEqual(t, resultRows, rowsExpected)
} }
@ -505,6 +505,13 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("_stream_id-filter", func(t *testing.T) {
f(t, `_stream_id:in(tenant.id:2 | fields _stream_id) | stats count() rows`, [][]Field{
{
{"rows", "105"},
},
})
})
t.Run("in-filter-with-subquery-match", func(t *testing.T) { t.Run("in-filter-with-subquery-match", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{ f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
{ {
@ -545,7 +552,7 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-extract", func(*testing.T) { t.Run("pipe-extract", func(t *testing.T) {
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{ f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
{ {
{"host", "0"}, {"host", "0"},
@ -561,7 +568,7 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) { t.Run("pipe-extract-if-filter-with-subquery", func(t *testing.T) {
f(t, `* | extract f(t, `* | extract
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id)) if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
"host-<host>:" from instance "host-<host>:" from instance
@ -590,7 +597,7 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) { t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(t *testing.T) {
f(t, `* | extract f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id)) if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance "host-<host>:" from instance
@ -611,7 +618,7 @@ func TestStorageRunQuery(t *testing.T) {
}, },
}) })
}) })
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) { t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(t *testing.T) {
f(t, `* | extract f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id)) if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance "host-<host>:" from instance
@ -717,7 +724,7 @@ func TestStorageSearch(t *testing.T) {
} }
} }
t.Run("missing-tenant-smaller-than-existing", func(_ *testing.T) { t.Run("missing-tenant-smaller-than-existing", func(t *testing.T) {
tenantID := TenantID{ tenantID := TenantID{
AccountID: 0, AccountID: 0,
ProjectID: 0, ProjectID: 0,
@ -735,7 +742,7 @@ func TestStorageSearch(t *testing.T) {
} }
s.search(workersCount, so, nil, processBlock) s.search(workersCount, so, nil, processBlock)
}) })
t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) { t.Run("missing-tenant-bigger-than-existing", func(t *testing.T) {
tenantID := TenantID{ tenantID := TenantID{
AccountID: tenantsCount + 1, AccountID: tenantsCount + 1,
ProjectID: 0, ProjectID: 0,
@ -753,7 +760,7 @@ func TestStorageSearch(t *testing.T) {
} }
s.search(workersCount, so, nil, processBlock) s.search(workersCount, so, nil, processBlock)
}) })
t.Run("missing-tenant-middle", func(_ *testing.T) { t.Run("missing-tenant-middle", func(t *testing.T) {
tenantID := TenantID{ tenantID := TenantID{
AccountID: 1, AccountID: 1,
ProjectID: 0, ProjectID: 0,
@ -817,7 +824,7 @@ func TestStorageSearch(t *testing.T) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
} }
}) })
t.Run("stream-filter-mismatch", func(_ *testing.T) { t.Run("stream-filter-mismatch", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`) sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`)
minTimestamp := baseTimestamp minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
@ -943,7 +950,7 @@ func TestStorageSearch(t *testing.T) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
} }
}) })
t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) { t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`) sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
tenantID := TenantID{ tenantID := TenantID{
AccountID: 1, AccountID: 1,