diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index acdc927a1..9bbdfc357 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -42,11 +42,10 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s sw := getSortWriter() sw.Init(w, maxSortBufferSize.IntN(), limit) tenantIDs := []logstorage.TenantID{tenantID} - vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) { + vlstorage.RunQuery(tenantIDs, q, stopCh, func(_ uint, rowsCount int, columns []logstorage.BlockColumn) { if len(columns) == 0 { return } - rowsCount := len(columns[0].Values) bb := blockResultPool.Get() for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index b1b55675c..da28aca44 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) { } // RunQuery runs the given q and calls processBlock for the returned data blocks -func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) { +func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []logstorage.BlockColumn)) { strg.RunQuery(tenantIDs, q, stopCh, processBlock) } diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index d4b662657..7c37c9507 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1056,7 +1056,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying LogsQL will support calculating the following stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) and fields created by [transformations](#transformations): -- The number of selected logs. +- The number of selected logs via `query | stats count() as total` syntax. - The number of non-empty values for the given field. - The number of unique values for the given field. - The min, max, avg, and sum for the given field. diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index a0ea0e3dd..c3778f109 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -399,17 +399,19 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *filterBitmap) { br.streamID = bs.bsw.bh.streamID - if !bm.isZero() { - // Initialize timestamps, since they are used for determining the number of rows in br.RowsCount() - srcTimestamps := bs.getTimestamps() - dstTimestamps := br.timestamps[:0] - bm.forEachSetBit(func(idx int) bool { - ts := srcTimestamps[idx] - dstTimestamps = append(dstTimestamps, ts) - return true - }) - br.timestamps = dstTimestamps + if bm.isZero() { + // Nothing to initialize for zero matching log entries in the block. + return } + // Initialize timestamps, since they are used for determining the number of rows in br.RowsCount() + srcTimestamps := bs.getTimestamps() + dstTimestamps := br.timestamps[:0] + bm.forEachSetBit(func(idx int) bool { + ts := srcTimestamps[idx] + dstTimestamps = append(dstTimestamps, ts) + return true + }) + br.timestamps = dstTimestamps } func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *filterBitmap) { diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 0b31dc995..265fd9fb7 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -187,31 +187,28 @@ func (lex *lexer) nextToken() { type Query struct { f filter - // fields contains optional list of fields to fetch - fields []string + pipes []pipe } // String returns string representation for q. func (q *Query) String() string { s := q.f.String() - if len(q.fields) > 0 { - a := make([]string, len(q.fields)) - for i, f := range q.fields { - if f != "*" { - f = quoteTokenIfNeeded(f) - } - a[i] = f - } - s += " | fields " + strings.Join(a, ", ") + for _, p := range q.pipes { + s += " | " + p.String() } return s } func (q *Query) getResultColumnNames() []string { - if len(q.fields) > 0 { - return q.fields + for _, p := range q.pipes { + switch t := p.(type) { + case *fieldsPipe: + return t.fields + case *statsPipe: + return t.neededFields() + } } return []string{"*"} } @@ -228,68 +225,15 @@ func ParseQuery(s string) (*Query, error) { f: f, } - if err := q.parsePipes(lex); err != nil { + pipes, err := parsePipes(lex) + if err != nil { return nil, fmt.Errorf("%w; context: %s", err, lex.context()) } + q.pipes = pipes return q, nil } -func (q *Query) parsePipes(lex *lexer) error { - for { - if lex.isEnd() { - return nil - } - if !lex.isKeyword("|") { - return fmt.Errorf("expecting '|'") - } - if !lex.mustNextToken() { - return fmt.Errorf("missing token after '|'") - } - switch { - case lex.isKeyword("fields"): - if err := q.parseFieldsPipe(lex); err != nil { - return fmt.Errorf("cannot parse fields pipe: %w", err) - } - default: - return fmt.Errorf("unexpected pipe %q", lex.token) - } - } -} - -func (q *Query) parseFieldsPipe(lex *lexer) error { - var fields []string - - for { - if !lex.mustNextToken() { - return fmt.Errorf("missing field name") - } - if lex.isKeyword(",") { - return fmt.Errorf("unexpected ','; expecting field name") - } - field := parseFieldName(lex) - fields = append(fields, field) - switch { - case lex.isKeyword("|", ""): - q.fields = fields - return nil - case lex.isKeyword(","): - default: - return fmt.Errorf("unexpected token: %q; expecting ','", lex.token) - } - } -} - -func parseFieldName(lex *lexer) string { - s := lex.token - lex.nextToken() - for !lex.isSkippedSpace && !lex.isKeyword(",", "|", "") { - s += lex.rawToken - lex.nextToken() - } - return s -} - func parseFilter(lex *lexer) (filter, error) { if !lex.mustNextToken() || lex.isKeyword("|") { return nil, fmt.Errorf("missing query") diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 8b01ceae5..3deaeede9 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -805,11 +805,18 @@ func TestParseQuerySuccess(t *testing.T) { `(_time:(2023-04-20,now] or _time:[-10m,-1m)) (_stream:{job="a"} or _stream:{instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`) // fields pipe - f(`foo | fields *`, `foo | fields *`) + f(`foo|fields *`, `foo | fields *`) f(`foo | fields bar`, `foo | fields bar`) - f(`foo | FIELDS bar,Baz , "a,b|c"`, `foo | fields bar, Baz, "a,b|c"`) + f(`foo|FIELDS bar,Baz , "a,b|c"`, `foo | fields bar, Baz, "a,b|c"`) f(`foo | Fields x.y:z/a, _b$c`, `foo | fields "x.y:z/a", "_b$c"`) - f(`foo | fields bar | fields baz, abc`, `foo | fields baz, abc`) + + // multiple fields pipes + f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`) + + // stats count pipe + f(`* | Stats count() AS foo`, `* | stats count() as foo`) + f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count() as XYz`) + f(`* | stats by() count(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`) } func TestParseQueryFailure(t *testing.T) { diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go new file mode 100644 index 000000000..fb027d4c2 --- /dev/null +++ b/lib/logstorage/pipes.go @@ -0,0 +1,271 @@ +package logstorage + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +type pipe interface { + String() string +} + +func parsePipes(lex *lexer) ([]pipe, error) { + var pipes []pipe + for !lex.isEnd() { + if !lex.isKeyword("|") { + return nil, fmt.Errorf("expecting '|'") + } + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing token after '|'") + } + switch { + case lex.isKeyword("fields"): + fp, err := parseFieldsPipe(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err) + } + pipes = append(pipes, fp) + case lex.isKeyword("stats"): + sp, err := parseStatsPipe(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err) + } + pipes = append(pipes, sp) + default: + return nil, fmt.Errorf("unexpected pipe %q", lex.token) + } + } + return pipes, nil +} + +type fieldsPipe struct { + // fields contains list of fields to fetch + fields []string +} + +func (fp *fieldsPipe) String() string { + if len(fp.fields) == 0 { + logger.Panicf("BUG: fieldsPipe must contain at least a single field") + } + return "fields " + fieldNamesString(fp.fields) +} + +func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { + var fields []string + for { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing field name") + } + if lex.isKeyword(",") { + return nil, fmt.Errorf("unexpected ','; expecting field name") + } + field := parseFieldName(lex) + fields = append(fields, field) + switch { + case lex.isKeyword("|", ""): + fp := &fieldsPipe{ + fields: fields, + } + return fp, nil + case lex.isKeyword(","): + default: + return nil, fmt.Errorf("unexpected token: %q; expecting ',' or '|'", lex.token) + } + } +} + +type statsPipe struct { + byFields []string + funcs []statsFunc +} + +type statsFunc interface { + // String returns string representation of statsFunc + String() string + + // neededFields returns the needed fields for calculating the given stats + neededFields() []string +} + +func (sp *statsPipe) String() string { + s := "stats " + if len(sp.byFields) > 0 { + s += "by (" + fieldNamesString(sp.byFields) + ") " + } + + if len(sp.funcs) == 0 { + logger.Panicf("BUG: statsPipe must contain at least a single statsFunc") + } + a := make([]string, len(sp.funcs)) + for i, f := range sp.funcs { + a[i] = f.String() + } + s += strings.Join(a, ", ") + return s +} + +func (sp *statsPipe) neededFields() []string { + var neededFields []string + m := make(map[string]struct{}) + updateNeededFields := func(fields []string) { + for _, field := range fields { + if _, ok := m[field]; !ok { + m[field] = struct{}{} + neededFields = append(neededFields, field) + } + } + } + + updateNeededFields(sp.byFields) + + for _, f := range sp.funcs { + fields := f.neededFields() + updateNeededFields(fields) + } + + return neededFields +} + +func parseStatsPipe(lex *lexer) (*statsPipe, error) { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing stats config") + } + + var sp statsPipe + if lex.isKeyword("by") { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'by': %w", err) + } + sp.byFields = fields + } + + var funcs []statsFunc + for { + sf, err := parseStatsFunc(lex) + if err != nil { + return nil, err + } + funcs = append(funcs, sf) + if lex.isKeyword("|", "") { + sp.funcs = funcs + return &sp, nil + } + if !lex.isKeyword(",") { + return nil, fmt.Errorf("unexpected token %q; want ',' or '|'", lex.token) + } + lex.nextToken() + } +} + +func parseStatsFunc(lex *lexer) (statsFunc, error) { + switch { + case lex.isKeyword("count"): + sfc, err := parseStatsFuncCount(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'count' func: %w", err) + } + return sfc, nil + default: + return nil, fmt.Errorf("unknown stats func %q", lex.token) + } +} + +type statsFuncCount struct { + fields []string + resultName string +} + +func (sfc *statsFuncCount) String() string { + fields := getFieldsIgnoreStar(sfc.fields) + return "count(" + fieldNamesString(fields) + ") as " + quoteTokenIfNeeded(sfc.resultName) +} + +func (sfc *statsFuncCount) neededFields() []string { + return getFieldsIgnoreStar(sfc.fields) +} + +func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { + lex.nextToken() + fields, err := parseFieldNamesInParens(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'count' args: %w", err) + } + + if !lex.isKeyword("as") { + return nil, fmt.Errorf("missing 'as' keyword") + } + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing token after 'as' keyword") + } + resultName := parseFieldName(lex) + + sfc := &statsFuncCount{ + fields: fields, + resultName: resultName, + } + return sfc, nil +} + +func parseFieldNamesInParens(lex *lexer) ([]string, error) { + if !lex.isKeyword("(") { + return nil, fmt.Errorf("missing `(`") + } + var fields []string + for { + if !lex.mustNextToken() { + return nil, fmt.Errorf("missing field name or ')'") + } + if lex.isKeyword(")") { + lex.nextToken() + return fields, nil + } + if lex.isKeyword(",") { + return nil, fmt.Errorf("unexpected `,`") + } + field := parseFieldName(lex) + fields = append(fields, field) + switch { + case lex.isKeyword(")"): + lex.nextToken() + return fields, nil + case lex.isKeyword(","): + default: + return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token) + } + } +} + +func parseFieldName(lex *lexer) string { + s := lex.token + lex.nextToken() + for !lex.isSkippedSpace && !lex.isKeyword(",", "|", ")", "") { + s += lex.rawToken + lex.nextToken() + } + return s +} + +func fieldNamesString(fields []string) string { + a := make([]string, len(fields)) + for i, f := range fields { + if f != "*" { + f = quoteTokenIfNeeded(f) + } + a[i] = f + } + return strings.Join(a, ", ") +} + +func getFieldsIgnoreStar(fields []string) []string { + var result []string + for _, f := range fields { + if f != "*" { + result = append(result, f) + } + } + return result +} diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 6a95db2ee..b0633862c 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -43,15 +43,16 @@ type searchOptions struct { } // RunQuery runs the given q and calls processBlock for results -func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn)) { +func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []BlockColumn)) { resultColumnNames := q.getResultColumnNames() so := &genericSearchOptions{ tenantIDs: tenantIDs, filter: q.f, resultColumnNames: resultColumnNames, } + workersCount := cgroup.AvailableCPUs() - s.search(workersCount, so, stopCh, func(_ uint, br *blockResult) { + s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { brs := getBlockRows() cs := brs.cs @@ -61,7 +62,8 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ Values: br.getColumnValues(i), }) } - processBlock(cs) + rowsCount := br.RowsCount() + processBlock(workerID, rowsCount, cs) brs.cs = cs putBlockRows(brs) diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 886ec56fe..544e55cc7 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -84,8 +84,8 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 0, ProjectID: 0, } - processBlock := func(_ []BlockColumn) { - panic(fmt.Errorf("unexpected match")) + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -96,8 +96,8 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ []BlockColumn) { - panic(fmt.Errorf("unexpected match")) + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -110,15 +110,15 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: uint32(10*i + 1), } expectedTenantID := tenantID.String() - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { hasTenantIDColumn := false var columnNames []string for _, c := range columns { if c.Name == "tenant.id" { hasTenantIDColumn = true - if len(c.Values) == 0 { - panic(fmt.Errorf("unexpected zero rows")) + if len(c.Values) != rowsCount { + panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) } for _, v := range c.Values { if v != expectedTenantID { @@ -131,47 +131,47 @@ func TestStorageRunQuery(t *testing.T) { if !hasTenantIDColumn { panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) } - rowsCount.Add(uint32(len(columns[0].Values))) + rowsCountTotal.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } } }) t.Run("matching-multiple-tenant-ids", func(t *testing.T) { q := mustParseQuery(`"log message"`) - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { - rowsCount.Add(uint32(len(columns[0].Values))) + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + rowsCountTotal.Add(uint32(rowsCount)) } s.RunQuery(allTenantIDs, q, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("matching-in-filter", func(t *testing.T) { q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { - rowsCount.Add(uint32(len(columns[0].Values))) + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + rowsCountTotal.Add(uint32(rowsCount)) } s.RunQuery(allTenantIDs, q, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) t.Run("stream-filter-mismatch", func(_ *testing.T) { q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) - processBlock := func(_ []BlockColumn) { - panic(fmt.Errorf("unexpected match")) + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) } s.RunQuery(allTenantIDs, q, nil, processBlock) }) @@ -183,15 +183,15 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } expectedStreamID := fmt.Sprintf("stream_id=%d", i) - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { hasStreamIDColumn := false var columnNames []string for _, c := range columns { if c.Name == "stream-id" { hasStreamIDColumn = true - if len(c.Values) == 0 { - panic(fmt.Errorf("unexpected zero rows")) + if len(c.Values) != rowsCount { + panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) } for _, v := range c.Values { if v != expectedStreamID { @@ -204,13 +204,13 @@ func TestStorageRunQuery(t *testing.T) { if !hasStreamIDColumn { panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) } - rowsCount.Add(uint32(len(columns[0].Values))) + rowsCountTotal.Add(uint32(len(columns[0].Values))) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount) } } @@ -221,15 +221,15 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { - rowsCount.Add(uint32(len(columns[0].Values))) + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + rowsCountTotal.Add(uint32(rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) @@ -241,15 +241,15 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { - rowsCount.Add(uint32(len(columns[0].Values))) + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + rowsCountTotal.Add(uint32(rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) @@ -261,15 +261,15 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - var rowsCount atomic.Uint32 - processBlock := func(columns []BlockColumn) { - rowsCount.Add(uint32(len(columns[0].Values))) + var rowsCountTotal atomic.Uint32 + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + rowsCountTotal.Add(uint32(rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) expectedRowsCount := blocksPerStream - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) @@ -281,8 +281,8 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ []BlockColumn) { - panic(fmt.Errorf("unexpected match")) + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -295,8 +295,8 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ []BlockColumn) { - panic(fmt.Errorf("unexpected match")) + processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) } tenantIDs := []TenantID{tenantID} s.RunQuery(tenantIDs, q, nil, processBlock) @@ -460,17 +460,17 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } } @@ -484,14 +484,14 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount) } }) @@ -525,17 +525,17 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } } @@ -554,17 +554,17 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) @@ -591,17 +591,17 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { if !br.streamID.tenantID.equal(&tenantID) { panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID)) } - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } }) @@ -619,14 +619,14 @@ func TestStorageSearch(t *testing.T) { filter: f, resultColumnNames: []string{"_msg"}, } - var rowsCount atomic.Uint32 + var rowsCountTotal atomic.Uint32 processBlock := func(_ uint, br *blockResult) { - rowsCount.Add(uint32(br.RowsCount())) + rowsCountTotal.Add(uint32(br.RowsCount())) } s.search(workersCount, so, nil, processBlock) expectedRowsCount := blocksPerStream - if n := rowsCount.Load(); n != uint32(expectedRowsCount) { + if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount) } })