lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-24 23:27:12 +02:00
parent 8dd69fd7ae
commit 7de6f5b4ce
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
17 changed files with 320 additions and 96 deletions

View file

@ -42,7 +42,7 @@ services:
# storing logs and serving read queries.
victorialogs:
container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
command:
- "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-filebeat-docker-vl:/vlogs
ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-filebeat-syslog-vl:/vlogs
ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-fluentbit-vl:/vlogs
ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-logstash-vl:/vlogs
ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140"
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-promtail-docker:/vlogs
ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- victorialogs-vector-docker-vl:/vlogs
ports:

View file

@ -3,7 +3,7 @@ version: '3'
services:
# Run `make package-victoria-logs` to build victoria-logs image
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
volumes:
- vlogs:/vlogs
ports:

View file

@ -19,6 +19,13 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.22.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.22.0-victorialogs)
Released at 2024-06-24
* FEATURE: allow specifying multiple `_stream_id` values in [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter) via `_stream_id:in(id1, ..., idN)` syntax.
* FEATURE: allow specifying subquery for searching for `_stream_id` values inside [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter). For example, `_stream_id:in(_time:5m error | fields _stream_id)` returns logs for [logs streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with the `error` word across logs for the last 5 minutes.
## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs)
Released at 2024-06-20

View file

@ -479,6 +479,20 @@ query selects logs for the given stream for the last hour:
_time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
```
The `_stream_id` filter supports specifying multiple `_stream_id` values via `_stream_id:in(...)` syntax. For example:
```logsql
_stream_id:in(0000007b000001c850d9950ea6196b1a4812081265faa1c7, 1230007b456701c850d9950ea6196b1a4812081265fff2a9)
```
It is also possible specifying subquery inside `in(...)`, which selects the needed `_stream_id` values. For example, the following query returns
logs for [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `error` [word](#word)
in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) during the last 5 minutes:
```logsql
_stream_id:in(_time:5m error | fields _stream_id)
```
See also:
- [stream filter](#stream-filter)

View file

@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.21.0-victorialogs/victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.22.0-victorialogs/victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.22.0-victorialogs.tar.gz
./victoria-logs-prod
```
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
docker.io/victoriametrics/victoria-logs:v0.22.0-victorialogs
```
See also:

View file

@ -1,27 +1,76 @@
package logstorage
import (
"strings"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// filterStreamID is the filter for `_stream_id:id`
type filterStreamID struct {
streamIDStr string
streamIDs []streamID
// needeExecuteQuery is set to true if q must be executed for populating streamIDs before filter execution.
needExecuteQuery bool
// If q is non-nil, then streamIDs must be populated from q before filter execution.
q *Query
// qFieldName must be set to field name for obtaining values from if q is non-nil.
qFieldName string
streamIDsMap map[string]struct{}
streamIDsMapOnce sync.Once
}
func (fs *filterStreamID) String() string {
return "_stream_id:" + quoteTokenIfNeeded(fs.streamIDStr)
if fs.q != nil {
return "_stream_id:in(" + fs.q.String() + ")"
}
streamIDs := fs.streamIDs
if len(streamIDs) == 1 {
return "_stream_id:" + string(streamIDs[0].marshalString(nil))
}
a := make([]string, len(streamIDs))
for i, streamID := range streamIDs {
a[i] = string(streamID.marshalString(nil))
}
return "_stream_id:in(" + strings.Join(a, ",") + ")"
}
func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_stream_id")
}
func (fs *filterStreamID) getStreamIDsMap() map[string]struct{} {
fs.streamIDsMapOnce.Do(fs.initStreamIDsMap)
return fs.streamIDsMap
}
func (fs *filterStreamID) initStreamIDsMap() {
m := make(map[string]struct{}, len(fs.streamIDs))
for _, streamID := range fs.streamIDs {
k := streamID.marshalString(nil)
m[string(k)] = struct{}{}
}
fs.streamIDsMap = m
}
func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
m := fs.getStreamIDsMap()
if len(m) == 0 {
bm.resetBits()
return
}
c := br.getColumnByName("_stream_id")
if c.isConst {
v := c.valuesEncoded[0]
if fs.streamIDStr != v {
if _, ok := m[v]; !ok {
bm.resetBits()
}
return
@ -36,16 +85,18 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return fs.streamIDStr == v
_, ok := m[v]
return ok
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if fs.streamIDStr == v {
c = 1
ch := byte(0)
_, ok := m[v]
if ok {
ch = 1
}
bb.B = append(bb.B, c)
bb.B = append(bb.B, ch)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
@ -73,10 +124,17 @@ func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
}
func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
m := fs.getStreamIDsMap()
if len(m) == 0 {
bm.resetBits()
return
}
bb := bbPool.Get()
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
ok := fs.streamIDStr == string(bb.B)
_, ok := m[string(bb.B)]
bbPool.Put(bb)
if !ok {
bm.resetBits()
return

View file

@ -12,19 +12,37 @@ func TestFilterStreamID(t *testing.T) {
t.Parallel()
// match
var sid1 streamID
if !sid1.tryUnmarshalFromString("0000007b000001c8302bc96e02e54e5524b3a68ec271e55e") {
t.Fatalf("cannot unmarshal _stream_id")
}
ft := &filterStreamID{
streamIDStr: "0000007b000001c8302bc96e02e54e5524b3a68ec271e55e",
streamIDs: []streamID{sid1},
}
testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9})
var sid2 streamID
if !sid2.tryUnmarshalFromString("0000007b000001c850d9950ea6196b1a4812081265faa1c7") {
t.Fatalf("cannot unmarshal _stream_id")
}
ft = &filterStreamID{
streamIDStr: "0000007b000001c850d9950ea6196b1a4812081265faa1c7",
streamIDs: []streamID{sid2},
}
testFilterMatchForStreamID(t, ft, []int{1, 4, 7})
ft = &filterStreamID{
streamIDs: []streamID{sid1, sid2},
}
testFilterMatchForStreamID(t, ft, []int{0, 1, 3, 4, 6, 7, 9})
// mismatch
ft = &filterStreamID{
streamIDStr: "abc",
streamIDs: nil,
}
testFilterMatchForStreamID(t, ft, nil)
ft = &filterStreamID{
streamIDs: []streamID{{}},
}
testFilterMatchForStreamID(t, ft, nil)
}

View file

@ -3,7 +3,6 @@ package logstorage
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
@ -235,46 +234,38 @@ func (q *Query) String() string {
return s
}
func (q *Query) getSortedStreamIDs() []streamID {
func (q *Query) getStreamIDs() []streamID {
switch t := q.f.(type) {
case *filterAnd:
for _, f := range t.filters {
streamIDs, ok := getSortedStreamIDsFromFilterOr(f)
streamIDs, ok := getStreamIDsFromFilterOr(f)
if ok {
return streamIDs
}
}
return nil
default:
streamIDs, _ := getSortedStreamIDsFromFilterOr(q.f)
streamIDs, _ := getStreamIDsFromFilterOr(q.f)
return streamIDs
}
}
func getSortedStreamIDsFromFilterOr(f filter) ([]streamID, bool) {
func getStreamIDsFromFilterOr(f filter) ([]streamID, bool) {
switch t := f.(type) {
case *filterOr:
streamIDsFilters := 0
var streamIDs []streamID
for _, f := range t.filters {
fs, ok := f.(*filterStreamID)
if !ok {
return nil, false
}
var sid streamID
if sid.tryUnmarshalFromString(fs.streamIDStr) {
streamIDs = append(streamIDs, sid)
}
streamIDsFilters++
streamIDs = append(streamIDs, fs.streamIDs...)
}
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
return streamIDs, len(streamIDs) > 0
return streamIDs, streamIDsFilters > 0
case *filterStreamID:
var sid streamID
if !sid.tryUnmarshalFromString(t.streamIDStr) {
return nil, true
}
return []streamID{sid}, true
return t.streamIDs, true
default:
return nil, false
}
@ -1830,17 +1821,89 @@ func stripTimezoneSuffix(s string) string {
return s[:len(s)-len(tz)]
}
func parseFilterStreamID(lex *lexer) (*filterStreamID, error) {
s, err := getCompoundToken(lex)
func parseFilterStreamID(lex *lexer) (filter, error) {
if lex.isKeyword("in") {
return parseFilterStreamIDIn(lex)
}
sid, err := parseStreamID(lex)
if err != nil {
return nil, err
return nil, fmt.Errorf("cannot parse _stream_id: %w", err)
}
fs := &filterStreamID{
streamIDStr: s,
streamIDs: []streamID{sid},
}
return fs, nil
}
func parseFilterStreamIDIn(lex *lexer) (filter, error) {
if !lex.isKeyword("in") {
return nil, fmt.Errorf("unexpected token %q; expecting 'in'", lex.token)
}
// Try parsing in(arg1, ..., argN) at first
lexState := lex.backupState()
fs, err := parseFuncArgs(lex, "", func(args []string) (filter, error) {
streamIDs := make([]streamID, len(args))
for i, arg := range args {
if !streamIDs[i].tryUnmarshalFromString(arg) {
return nil, fmt.Errorf("cannot unmarshal _stream_id from %q", arg)
}
}
fs := &filterStreamID{
streamIDs: streamIDs,
}
return fs, nil
})
if err == nil {
return fs, nil
}
// Try parsing in(query)
lex.restoreState(lexState)
lex.nextToken()
if !lex.isKeyword("(") {
return nil, fmt.Errorf("missing '(' after 'in'")
}
lex.nextToken()
q, err := parseQuery(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse query inside 'in(...)': %w", err)
}
if !lex.isKeyword(")") {
return nil, fmt.Errorf("missing ')' after 'in(%s)'", q)
}
lex.nextToken()
qFieldName, err := getFieldNameFromPipes(q.pipes)
if err != nil {
return nil, fmt.Errorf("cannot determine field name for values in 'in(%s)': %w", q, err)
}
fs = &filterStreamID{
needExecuteQuery: true,
q: q,
qFieldName: qFieldName,
}
return fs, nil
}
func parseStreamID(lex *lexer) (streamID, error) {
var sid streamID
s, err := getCompoundToken(lex)
if err != nil {
return sid, err
}
if !sid.tryUnmarshalFromString(s) {
return sid, fmt.Errorf("cannot unmarshal _stream_id from %q", s)
}
return sid, nil
}
func parseFilterStream(lex *lexer) (*filterStream, error) {
sf, err := parseStreamFilter(lex)
if err != nil {

View file

@ -697,8 +697,13 @@ func TestParseQuerySuccess(t *testing.T) {
f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`)
// _stream_id filter
f(`_stream_id:foo`, `_stream_id:foo`)
f(`_stream_id:foo-bar/b:az`, `_stream_id:"foo-bar/b:az"`)
f(`_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:"0000007b000001c8302bc96e02e54e5524b3a68ec271e55e"`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:in()`, `_stream_id:in()`)
f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e)`, `_stream_id:0000007b000001c8302bc96e02e54e5524b3a68ec271e55e`)
f(`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e, "0000007b000001c850d9950ea6196b1a4812081265faa1c7")`,
`_stream_id:in(0000007b000001c8302bc96e02e54e5524b3a68ec271e55e,0000007b000001c850d9950ea6196b1a4812081265faa1c7)`)
f(`_stream_id:in(_time:5m | fields _stream_id)`, `_stream_id:in(_time:5m | fields _stream_id)`)
// _stream filters
f(`_stream:{}`, `_stream:{}`)
@ -1243,7 +1248,11 @@ func TestParseQueryFailure(t *testing.T) {
f("`foo")
// invalid _stream_id filters
f("_stream_id:(foo)")
f("_stream_id:foo")
f("_stream_id:()")
f("_stream_id:in(foo)")
f("_stream_id:in(foo | bar)")
f("_stream_id:in(* | stats by (x) count() y)")
// invalid _stream filters
f("_stream:")

View file

@ -105,7 +105,11 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
streamIDs := q.getSortedStreamIDs()
streamIDs := q.getStreamIDs()
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
@ -427,8 +431,14 @@ func hasFilterInWithQueryForFilter(f filter) bool {
return false
}
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
return ok && fi.needExecuteQuery
switch t := f.(type) {
case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
}
return visitFilter(f, visitFunc)
}
@ -465,33 +475,71 @@ func initFilterInValuesForFilter(cache map[string][]string, f filter, getFieldVa
}
visitFunc := func(f filter) bool {
fi, ok := f.(*filterIn)
return ok && fi.needExecuteQuery
switch t := f.(type) {
case *filterIn:
return t.needExecuteQuery
case *filterStreamID:
return t.needExecuteQuery
default:
return false
}
}
copyFunc := func(f filter) (filter, error) {
fi := f.(*filterIn)
qStr := fi.q.String()
values, ok := cache[qStr]
if !ok {
vs, err := getFieldValuesFunc(fi.q, fi.qFieldName)
switch t := f.(type) {
case *filterIn:
values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", fi, err)
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
}
cache[qStr] = vs
values = vs
}
fiNew := &filterIn{
fieldName: fi.fieldName,
q: fi.q,
values: values,
fiNew := &filterIn{
fieldName: t.fieldName,
q: t.q,
values: values,
}
return fiNew, nil
case *filterStreamID:
values, err := getValuesForQuery(t.q, t.qFieldName, cache, getFieldValuesFunc)
if err != nil {
return nil, fmt.Errorf("cannot obtain unique values for %s: %w", t, err)
}
// convert values to streamID list
streamIDs := make([]streamID, 0, len(values))
for _, v := range values {
var sid streamID
if sid.tryUnmarshalFromString(v) {
streamIDs = append(streamIDs, sid)
}
}
fsNew := &filterStreamID{
streamIDs: streamIDs,
q: t.q,
}
return fsNew, nil
default:
return f, nil
}
return fiNew, nil
}
return copyFilter(f, visitFunc, copyFunc)
}
func getValuesForQuery(q *Query, qFieldName string, cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) ([]string, error) {
qStr := q.String()
values, ok := cache[qStr]
if ok {
return values, nil
}
vs, err := getFieldValuesFunc(q, qFieldName)
if err != nil {
return nil, err
}
cache[qStr] = vs
return vs, nil
}
func initFilterInValuesForPipes(cache map[string][]string, pipes []pipe, getFieldValuesFunc getFieldValuesFunc) ([]pipe, error) {
pipesNew := make([]pipe, len(pipes))
for i, p := range pipes {

View file

@ -82,7 +82,7 @@ func TestStorageRunQuery(t *testing.T) {
}
s.debugFlush()
mustRunQuery := func(tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
mustRunQuery := func(t *testing.T, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) {
t.Helper()
err := s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
if err != nil {
@ -91,7 +91,7 @@ func TestStorageRunQuery(t *testing.T) {
}
// run tests on the storage data
t.Run("missing-tenant", func(_ *testing.T) {
t.Run("missing-tenant", func(t *testing.T) {
q := mustParseQuery(`"log message"`)
tenantID := TenantID{
AccountID: 0,
@ -101,9 +101,9 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("missing-message-text", func(_ *testing.T) {
t.Run("missing-message-text", func(t *testing.T) {
q := mustParseQuery(`foobar`)
tenantID := TenantID{
AccountID: 1,
@ -113,7 +113,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("matching-tenant-id", func(t *testing.T) {
q := mustParseQuery(`tenant.id:*`)
@ -147,7 +147,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -161,7 +161,7 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
mustRunQuery(allTenantIDs, q, writeBlock)
mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -174,19 +174,19 @@ func TestStorageRunQuery(t *testing.T) {
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
mustRunQuery(allTenantIDs, q, writeBlock)
mustRunQuery(t, allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("stream-filter-mismatch", func(_ *testing.T) {
t.Run("stream-filter-mismatch", func(t *testing.T) {
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
mustRunQuery(allTenantIDs, q, writeBlock)
mustRunQuery(t, allTenantIDs, q, writeBlock)
})
t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ {
@ -220,7 +220,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -239,7 +239,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -259,7 +259,7 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -279,14 +279,14 @@ func TestStorageRunQuery(t *testing.T) {
rowsCountTotal.Add(uint32(len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) {
t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_stream:{job="foobar",instance="host-1:234"} _time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
@ -298,9 +298,9 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("missing-time-range", func(_ *testing.T) {
t.Run("missing-time-range", func(t *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
maxTimestamp := baseTimestamp + (rowsPerBlock+2)*1e9
q := mustParseQuery(fmt.Sprintf(`_time:[%d, %d)`, minTimestamp/1e9, maxTimestamp/1e9))
@ -312,7 +312,7 @@ func TestStorageRunQuery(t *testing.T) {
panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
}
tenantIDs := []TenantID{tenantID}
mustRunQuery(tenantIDs, q, writeBlock)
mustRunQuery(t, tenantIDs, q, writeBlock)
})
t.Run("field_names-all", func(t *testing.T) {
q := mustParseQuery("*")
@ -493,7 +493,7 @@ func TestStorageRunQuery(t *testing.T) {
resultRowsLock.Unlock()
}
}
mustRunQuery(allTenantIDs, q, writeBlock)
mustRunQuery(t, allTenantIDs, q, writeBlock)
assertRowsEqual(t, resultRows, rowsExpected)
}
@ -505,6 +505,13 @@ func TestStorageRunQuery(t *testing.T) {
},
})
})
t.Run("_stream_id-filter", func(t *testing.T) {
f(t, `_stream_id:in(tenant.id:2 | fields _stream_id) | stats count() rows`, [][]Field{
{
{"rows", "105"},
},
})
})
t.Run("in-filter-with-subquery-match", func(t *testing.T) {
f(t, `tenant.id:in(tenant.id:2 | fields tenant.id) | stats count() rows`, [][]Field{
{
@ -545,7 +552,7 @@ func TestStorageRunQuery(t *testing.T) {
},
})
})
t.Run("pipe-extract", func(*testing.T) {
t.Run("pipe-extract", func(t *testing.T) {
f(t, `* | extract "host-<host>:" from instance | uniq (host) with hits | sort by (host)`, [][]Field{
{
{"host", "0"},
@ -561,7 +568,7 @@ func TestStorageRunQuery(t *testing.T) {
},
})
})
t.Run("pipe-extract-if-filter-with-subquery", func(*testing.T) {
t.Run("pipe-extract-if-filter-with-subquery", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:(3 or 4) | fields tenant.id))
"host-<host>:" from instance
@ -590,7 +597,7 @@ func TestStorageRunQuery(t *testing.T) {
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(*testing.T) {
t.Run("pipe-extract-if-filter-with-subquery-non-empty-host", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
@ -611,7 +618,7 @@ func TestStorageRunQuery(t *testing.T) {
},
})
})
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(*testing.T) {
t.Run("pipe-extract-if-filter-with-subquery-empty-host", func(t *testing.T) {
f(t, `* | extract
if (tenant.id:in(tenant.id:3 | fields tenant.id))
"host-<host>:" from instance
@ -717,7 +724,7 @@ func TestStorageSearch(t *testing.T) {
}
}
t.Run("missing-tenant-smaller-than-existing", func(_ *testing.T) {
t.Run("missing-tenant-smaller-than-existing", func(t *testing.T) {
tenantID := TenantID{
AccountID: 0,
ProjectID: 0,
@ -735,7 +742,7 @@ func TestStorageSearch(t *testing.T) {
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("missing-tenant-bigger-than-existing", func(_ *testing.T) {
t.Run("missing-tenant-bigger-than-existing", func(t *testing.T) {
tenantID := TenantID{
AccountID: tenantsCount + 1,
ProjectID: 0,
@ -753,7 +760,7 @@ func TestStorageSearch(t *testing.T) {
}
s.search(workersCount, so, nil, processBlock)
})
t.Run("missing-tenant-middle", func(_ *testing.T) {
t.Run("missing-tenant-middle", func(t *testing.T) {
tenantID := TenantID{
AccountID: 1,
ProjectID: 0,
@ -817,7 +824,7 @@ func TestStorageSearch(t *testing.T) {
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("stream-filter-mismatch", func(_ *testing.T) {
t.Run("stream-filter-mismatch", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance=~"host-.+:2345"}`)
minTimestamp := baseTimestamp
maxTimestamp := baseTimestamp + rowsPerBlock*1e9 + blocksPerStream
@ -943,7 +950,7 @@ func TestStorageSearch(t *testing.T) {
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
}
})
t.Run("matching-stream-id-missing-time-range", func(_ *testing.T) {
t.Run("matching-stream-id-missing-time-range", func(t *testing.T) {
sf := mustNewTestStreamFilter(`{job="foobar",instance="host-1:234"}`)
tenantID := TenantID{
AccountID: 1,