mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
5512787b72
commit
65f09bc641
9 changed files with 374 additions and 149 deletions
|
@ -42,11 +42,10 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
||||||
sw := getSortWriter()
|
sw := getSortWriter()
|
||||||
sw.Init(w, maxSortBufferSize.IntN(), limit)
|
sw.Init(w, maxSortBufferSize.IntN(), limit)
|
||||||
tenantIDs := []logstorage.TenantID{tenantID}
|
tenantIDs := []logstorage.TenantID{tenantID}
|
||||||
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
|
vlstorage.RunQuery(tenantIDs, q, stopCh, func(_ uint, rowsCount int, columns []logstorage.BlockColumn) {
|
||||||
if len(columns) == 0 {
|
if len(columns) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rowsCount := len(columns[0].Values)
|
|
||||||
|
|
||||||
bb := blockResultPool.Get()
|
bb := blockResultPool.Get()
|
||||||
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
||||||
|
|
|
@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunQuery runs the given q and calls processBlock for the returned data blocks
|
// RunQuery runs the given q and calls processBlock for the returned data blocks
|
||||||
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) {
|
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []logstorage.BlockColumn)) {
|
||||||
strg.RunQuery(tenantIDs, q, stopCh, processBlock)
|
strg.RunQuery(tenantIDs, q, stopCh, processBlock)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1056,7 +1056,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying
|
||||||
LogsQL will support calculating the following stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
LogsQL will support calculating the following stats based on the [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
and fields created by [transformations](#transformations):
|
and fields created by [transformations](#transformations):
|
||||||
|
|
||||||
- The number of selected logs.
|
- The number of selected logs via `query | stats count() as total` syntax.
|
||||||
- The number of non-empty values for the given field.
|
- The number of non-empty values for the given field.
|
||||||
- The number of unique values for the given field.
|
- The number of unique values for the given field.
|
||||||
- The min, max, avg, and sum for the given field.
|
- The min, max, avg, and sum for the given field.
|
||||||
|
|
|
@ -399,17 +399,19 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *filterBitmap) {
|
||||||
|
|
||||||
br.streamID = bs.bsw.bh.streamID
|
br.streamID = bs.bsw.bh.streamID
|
||||||
|
|
||||||
if !bm.isZero() {
|
if bm.isZero() {
|
||||||
// Initialize timestamps, since they are used for determining the number of rows in br.RowsCount()
|
// Nothing to initialize for zero matching log entries in the block.
|
||||||
srcTimestamps := bs.getTimestamps()
|
return
|
||||||
dstTimestamps := br.timestamps[:0]
|
|
||||||
bm.forEachSetBit(func(idx int) bool {
|
|
||||||
ts := srcTimestamps[idx]
|
|
||||||
dstTimestamps = append(dstTimestamps, ts)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
br.timestamps = dstTimestamps
|
|
||||||
}
|
}
|
||||||
|
// Initialize timestamps, since they are used for determining the number of rows in br.RowsCount()
|
||||||
|
srcTimestamps := bs.getTimestamps()
|
||||||
|
dstTimestamps := br.timestamps[:0]
|
||||||
|
bm.forEachSetBit(func(idx int) bool {
|
||||||
|
ts := srcTimestamps[idx]
|
||||||
|
dstTimestamps = append(dstTimestamps, ts)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
br.timestamps = dstTimestamps
|
||||||
}
|
}
|
||||||
|
|
||||||
func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *filterBitmap) {
|
func (br *blockResult) addColumn(bs *blockSearch, ch *columnHeader, bm *filterBitmap) {
|
||||||
|
|
|
@ -187,31 +187,28 @@ func (lex *lexer) nextToken() {
|
||||||
type Query struct {
|
type Query struct {
|
||||||
f filter
|
f filter
|
||||||
|
|
||||||
// fields contains optional list of fields to fetch
|
pipes []pipe
|
||||||
fields []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// String returns string representation for q.
|
// String returns string representation for q.
|
||||||
func (q *Query) String() string {
|
func (q *Query) String() string {
|
||||||
s := q.f.String()
|
s := q.f.String()
|
||||||
|
|
||||||
if len(q.fields) > 0 {
|
for _, p := range q.pipes {
|
||||||
a := make([]string, len(q.fields))
|
s += " | " + p.String()
|
||||||
for i, f := range q.fields {
|
|
||||||
if f != "*" {
|
|
||||||
f = quoteTokenIfNeeded(f)
|
|
||||||
}
|
|
||||||
a[i] = f
|
|
||||||
}
|
|
||||||
s += " | fields " + strings.Join(a, ", ")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Query) getResultColumnNames() []string {
|
func (q *Query) getResultColumnNames() []string {
|
||||||
if len(q.fields) > 0 {
|
for _, p := range q.pipes {
|
||||||
return q.fields
|
switch t := p.(type) {
|
||||||
|
case *fieldsPipe:
|
||||||
|
return t.fields
|
||||||
|
case *statsPipe:
|
||||||
|
return t.neededFields()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return []string{"*"}
|
return []string{"*"}
|
||||||
}
|
}
|
||||||
|
@ -228,68 +225,15 @@ func ParseQuery(s string) (*Query, error) {
|
||||||
f: f,
|
f: f,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := q.parsePipes(lex); err != nil {
|
pipes, err := parsePipes(lex)
|
||||||
|
if err != nil {
|
||||||
return nil, fmt.Errorf("%w; context: %s", err, lex.context())
|
return nil, fmt.Errorf("%w; context: %s", err, lex.context())
|
||||||
}
|
}
|
||||||
|
q.pipes = pipes
|
||||||
|
|
||||||
return q, nil
|
return q, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *Query) parsePipes(lex *lexer) error {
|
|
||||||
for {
|
|
||||||
if lex.isEnd() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if !lex.isKeyword("|") {
|
|
||||||
return fmt.Errorf("expecting '|'")
|
|
||||||
}
|
|
||||||
if !lex.mustNextToken() {
|
|
||||||
return fmt.Errorf("missing token after '|'")
|
|
||||||
}
|
|
||||||
switch {
|
|
||||||
case lex.isKeyword("fields"):
|
|
||||||
if err := q.parseFieldsPipe(lex); err != nil {
|
|
||||||
return fmt.Errorf("cannot parse fields pipe: %w", err)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unexpected pipe %q", lex.token)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *Query) parseFieldsPipe(lex *lexer) error {
|
|
||||||
var fields []string
|
|
||||||
|
|
||||||
for {
|
|
||||||
if !lex.mustNextToken() {
|
|
||||||
return fmt.Errorf("missing field name")
|
|
||||||
}
|
|
||||||
if lex.isKeyword(",") {
|
|
||||||
return fmt.Errorf("unexpected ','; expecting field name")
|
|
||||||
}
|
|
||||||
field := parseFieldName(lex)
|
|
||||||
fields = append(fields, field)
|
|
||||||
switch {
|
|
||||||
case lex.isKeyword("|", ""):
|
|
||||||
q.fields = fields
|
|
||||||
return nil
|
|
||||||
case lex.isKeyword(","):
|
|
||||||
default:
|
|
||||||
return fmt.Errorf("unexpected token: %q; expecting ','", lex.token)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseFieldName(lex *lexer) string {
|
|
||||||
s := lex.token
|
|
||||||
lex.nextToken()
|
|
||||||
for !lex.isSkippedSpace && !lex.isKeyword(",", "|", "") {
|
|
||||||
s += lex.rawToken
|
|
||||||
lex.nextToken()
|
|
||||||
}
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseFilter(lex *lexer) (filter, error) {
|
func parseFilter(lex *lexer) (filter, error) {
|
||||||
if !lex.mustNextToken() || lex.isKeyword("|") {
|
if !lex.mustNextToken() || lex.isKeyword("|") {
|
||||||
return nil, fmt.Errorf("missing query")
|
return nil, fmt.Errorf("missing query")
|
||||||
|
|
|
@ -805,11 +805,18 @@ func TestParseQuerySuccess(t *testing.T) {
|
||||||
`(_time:(2023-04-20,now] or _time:[-10m,-1m)) (_stream:{job="a"} or _stream:{instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`)
|
`(_time:(2023-04-20,now] or _time:[-10m,-1m)) (_stream:{job="a"} or _stream:{instance!="b"}) (err* or ip:ipv4_range(1.2.3.0, 1.2.3.255) !ip:1.2.3.4)`)
|
||||||
|
|
||||||
// fields pipe
|
// fields pipe
|
||||||
f(`foo | fields *`, `foo | fields *`)
|
f(`foo|fields *`, `foo | fields *`)
|
||||||
f(`foo | fields bar`, `foo | fields bar`)
|
f(`foo | fields bar`, `foo | fields bar`)
|
||||||
f(`foo | FIELDS bar,Baz , "a,b|c"`, `foo | fields bar, Baz, "a,b|c"`)
|
f(`foo|FIELDS bar,Baz , "a,b|c"`, `foo | fields bar, Baz, "a,b|c"`)
|
||||||
f(`foo | Fields x.y:z/a, _b$c`, `foo | fields "x.y:z/a", "_b$c"`)
|
f(`foo | Fields x.y:z/a, _b$c`, `foo | fields "x.y:z/a", "_b$c"`)
|
||||||
f(`foo | fields bar | fields baz, abc`, `foo | fields baz, abc`)
|
|
||||||
|
// multiple fields pipes
|
||||||
|
f(`foo | fields bar | fields baz, abc`, `foo | fields bar | fields baz, abc`)
|
||||||
|
|
||||||
|
// stats count pipe
|
||||||
|
f(`* | Stats count() AS foo`, `* | stats count() as foo`)
|
||||||
|
f(`* | STATS bY (foo, b.a/r, "b az") count(*) as XYz`, `* | stats by (foo, "b.a/r", "b az") count() as XYz`)
|
||||||
|
f(`* | stats by() count(x, 'a).b,c|d') as qwert`, `* | stats count(x, "a).b,c|d") as qwert`)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestParseQueryFailure(t *testing.T) {
|
func TestParseQueryFailure(t *testing.T) {
|
||||||
|
|
271
lib/logstorage/pipes.go
Normal file
271
lib/logstorage/pipes.go
Normal file
|
@ -0,0 +1,271 @@
|
||||||
|
package logstorage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
|
)
|
||||||
|
|
||||||
|
type pipe interface {
|
||||||
|
String() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func parsePipes(lex *lexer) ([]pipe, error) {
|
||||||
|
var pipes []pipe
|
||||||
|
for !lex.isEnd() {
|
||||||
|
if !lex.isKeyword("|") {
|
||||||
|
return nil, fmt.Errorf("expecting '|'")
|
||||||
|
}
|
||||||
|
if !lex.mustNextToken() {
|
||||||
|
return nil, fmt.Errorf("missing token after '|'")
|
||||||
|
}
|
||||||
|
switch {
|
||||||
|
case lex.isKeyword("fields"):
|
||||||
|
fp, err := parseFieldsPipe(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'fields' pipe: %w", err)
|
||||||
|
}
|
||||||
|
pipes = append(pipes, fp)
|
||||||
|
case lex.isKeyword("stats"):
|
||||||
|
sp, err := parseStatsPipe(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
|
||||||
|
}
|
||||||
|
pipes = append(pipes, sp)
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return pipes, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type fieldsPipe struct {
|
||||||
|
// fields contains list of fields to fetch
|
||||||
|
fields []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fp *fieldsPipe) String() string {
|
||||||
|
if len(fp.fields) == 0 {
|
||||||
|
logger.Panicf("BUG: fieldsPipe must contain at least a single field")
|
||||||
|
}
|
||||||
|
return "fields " + fieldNamesString(fp.fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
|
||||||
|
var fields []string
|
||||||
|
for {
|
||||||
|
if !lex.mustNextToken() {
|
||||||
|
return nil, fmt.Errorf("missing field name")
|
||||||
|
}
|
||||||
|
if lex.isKeyword(",") {
|
||||||
|
return nil, fmt.Errorf("unexpected ','; expecting field name")
|
||||||
|
}
|
||||||
|
field := parseFieldName(lex)
|
||||||
|
fields = append(fields, field)
|
||||||
|
switch {
|
||||||
|
case lex.isKeyword("|", ""):
|
||||||
|
fp := &fieldsPipe{
|
||||||
|
fields: fields,
|
||||||
|
}
|
||||||
|
return fp, nil
|
||||||
|
case lex.isKeyword(","):
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or '|'", lex.token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsPipe struct {
|
||||||
|
byFields []string
|
||||||
|
funcs []statsFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsFunc interface {
|
||||||
|
// String returns string representation of statsFunc
|
||||||
|
String() string
|
||||||
|
|
||||||
|
// neededFields returns the needed fields for calculating the given stats
|
||||||
|
neededFields() []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sp *statsPipe) String() string {
|
||||||
|
s := "stats "
|
||||||
|
if len(sp.byFields) > 0 {
|
||||||
|
s += "by (" + fieldNamesString(sp.byFields) + ") "
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(sp.funcs) == 0 {
|
||||||
|
logger.Panicf("BUG: statsPipe must contain at least a single statsFunc")
|
||||||
|
}
|
||||||
|
a := make([]string, len(sp.funcs))
|
||||||
|
for i, f := range sp.funcs {
|
||||||
|
a[i] = f.String()
|
||||||
|
}
|
||||||
|
s += strings.Join(a, ", ")
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sp *statsPipe) neededFields() []string {
|
||||||
|
var neededFields []string
|
||||||
|
m := make(map[string]struct{})
|
||||||
|
updateNeededFields := func(fields []string) {
|
||||||
|
for _, field := range fields {
|
||||||
|
if _, ok := m[field]; !ok {
|
||||||
|
m[field] = struct{}{}
|
||||||
|
neededFields = append(neededFields, field)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
updateNeededFields(sp.byFields)
|
||||||
|
|
||||||
|
for _, f := range sp.funcs {
|
||||||
|
fields := f.neededFields()
|
||||||
|
updateNeededFields(fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
return neededFields
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStatsPipe(lex *lexer) (*statsPipe, error) {
|
||||||
|
if !lex.mustNextToken() {
|
||||||
|
return nil, fmt.Errorf("missing stats config")
|
||||||
|
}
|
||||||
|
|
||||||
|
var sp statsPipe
|
||||||
|
if lex.isKeyword("by") {
|
||||||
|
lex.nextToken()
|
||||||
|
fields, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'by': %w", err)
|
||||||
|
}
|
||||||
|
sp.byFields = fields
|
||||||
|
}
|
||||||
|
|
||||||
|
var funcs []statsFunc
|
||||||
|
for {
|
||||||
|
sf, err := parseStatsFunc(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
funcs = append(funcs, sf)
|
||||||
|
if lex.isKeyword("|", "") {
|
||||||
|
sp.funcs = funcs
|
||||||
|
return &sp, nil
|
||||||
|
}
|
||||||
|
if !lex.isKeyword(",") {
|
||||||
|
return nil, fmt.Errorf("unexpected token %q; want ',' or '|'", lex.token)
|
||||||
|
}
|
||||||
|
lex.nextToken()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStatsFunc(lex *lexer) (statsFunc, error) {
|
||||||
|
switch {
|
||||||
|
case lex.isKeyword("count"):
|
||||||
|
sfc, err := parseStatsFuncCount(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'count' func: %w", err)
|
||||||
|
}
|
||||||
|
return sfc, nil
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown stats func %q", lex.token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type statsFuncCount struct {
|
||||||
|
fields []string
|
||||||
|
resultName string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sfc *statsFuncCount) String() string {
|
||||||
|
fields := getFieldsIgnoreStar(sfc.fields)
|
||||||
|
return "count(" + fieldNamesString(fields) + ") as " + quoteTokenIfNeeded(sfc.resultName)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sfc *statsFuncCount) neededFields() []string {
|
||||||
|
return getFieldsIgnoreStar(sfc.fields)
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
|
||||||
|
lex.nextToken()
|
||||||
|
fields, err := parseFieldNamesInParens(lex)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("cannot parse 'count' args: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !lex.isKeyword("as") {
|
||||||
|
return nil, fmt.Errorf("missing 'as' keyword")
|
||||||
|
}
|
||||||
|
if !lex.mustNextToken() {
|
||||||
|
return nil, fmt.Errorf("missing token after 'as' keyword")
|
||||||
|
}
|
||||||
|
resultName := parseFieldName(lex)
|
||||||
|
|
||||||
|
sfc := &statsFuncCount{
|
||||||
|
fields: fields,
|
||||||
|
resultName: resultName,
|
||||||
|
}
|
||||||
|
return sfc, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFieldNamesInParens(lex *lexer) ([]string, error) {
|
||||||
|
if !lex.isKeyword("(") {
|
||||||
|
return nil, fmt.Errorf("missing `(`")
|
||||||
|
}
|
||||||
|
var fields []string
|
||||||
|
for {
|
||||||
|
if !lex.mustNextToken() {
|
||||||
|
return nil, fmt.Errorf("missing field name or ')'")
|
||||||
|
}
|
||||||
|
if lex.isKeyword(")") {
|
||||||
|
lex.nextToken()
|
||||||
|
return fields, nil
|
||||||
|
}
|
||||||
|
if lex.isKeyword(",") {
|
||||||
|
return nil, fmt.Errorf("unexpected `,`")
|
||||||
|
}
|
||||||
|
field := parseFieldName(lex)
|
||||||
|
fields = append(fields, field)
|
||||||
|
switch {
|
||||||
|
case lex.isKeyword(")"):
|
||||||
|
lex.nextToken()
|
||||||
|
return fields, nil
|
||||||
|
case lex.isKeyword(","):
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unexpected token: %q; expecting ',' or ')'", lex.token)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseFieldName(lex *lexer) string {
|
||||||
|
s := lex.token
|
||||||
|
lex.nextToken()
|
||||||
|
for !lex.isSkippedSpace && !lex.isKeyword(",", "|", ")", "") {
|
||||||
|
s += lex.rawToken
|
||||||
|
lex.nextToken()
|
||||||
|
}
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func fieldNamesString(fields []string) string {
|
||||||
|
a := make([]string, len(fields))
|
||||||
|
for i, f := range fields {
|
||||||
|
if f != "*" {
|
||||||
|
f = quoteTokenIfNeeded(f)
|
||||||
|
}
|
||||||
|
a[i] = f
|
||||||
|
}
|
||||||
|
return strings.Join(a, ", ")
|
||||||
|
}
|
||||||
|
|
||||||
|
func getFieldsIgnoreStar(fields []string) []string {
|
||||||
|
var result []string
|
||||||
|
for _, f := range fields {
|
||||||
|
if f != "*" {
|
||||||
|
result = append(result, f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
|
@ -43,15 +43,16 @@ type searchOptions struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunQuery runs the given q and calls processBlock for results
|
// RunQuery runs the given q and calls processBlock for results
|
||||||
func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn)) {
|
func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []BlockColumn)) {
|
||||||
resultColumnNames := q.getResultColumnNames()
|
resultColumnNames := q.getResultColumnNames()
|
||||||
so := &genericSearchOptions{
|
so := &genericSearchOptions{
|
||||||
tenantIDs: tenantIDs,
|
tenantIDs: tenantIDs,
|
||||||
filter: q.f,
|
filter: q.f,
|
||||||
resultColumnNames: resultColumnNames,
|
resultColumnNames: resultColumnNames,
|
||||||
}
|
}
|
||||||
|
|
||||||
workersCount := cgroup.AvailableCPUs()
|
workersCount := cgroup.AvailableCPUs()
|
||||||
s.search(workersCount, so, stopCh, func(_ uint, br *blockResult) {
|
s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) {
|
||||||
brs := getBlockRows()
|
brs := getBlockRows()
|
||||||
cs := brs.cs
|
cs := brs.cs
|
||||||
|
|
||||||
|
@ -61,7 +62,8 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
|
||||||
Values: br.getColumnValues(i),
|
Values: br.getColumnValues(i),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
processBlock(cs)
|
rowsCount := br.RowsCount()
|
||||||
|
processBlock(workerID, rowsCount, cs)
|
||||||
|
|
||||||
brs.cs = cs
|
brs.cs = cs
|
||||||
putBlockRows(brs)
|
putBlockRows(brs)
|
||||||
|
|
|
@ -84,8 +84,8 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 0,
|
AccountID: 0,
|
||||||
ProjectID: 0,
|
ProjectID: 0,
|
||||||
}
|
}
|
||||||
processBlock := func(_ []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match"))
|
panic(fmt.Errorf("unexpected match for %d rows", rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
@ -96,8 +96,8 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
processBlock := func(_ []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match"))
|
panic(fmt.Errorf("unexpected match for %d rows", rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
@ -110,15 +110,15 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
ProjectID: uint32(10*i + 1),
|
ProjectID: uint32(10*i + 1),
|
||||||
}
|
}
|
||||||
expectedTenantID := tenantID.String()
|
expectedTenantID := tenantID.String()
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, columns []BlockColumn) {
|
||||||
hasTenantIDColumn := false
|
hasTenantIDColumn := false
|
||||||
var columnNames []string
|
var columnNames []string
|
||||||
for _, c := range columns {
|
for _, c := range columns {
|
||||||
if c.Name == "tenant.id" {
|
if c.Name == "tenant.id" {
|
||||||
hasTenantIDColumn = true
|
hasTenantIDColumn = true
|
||||||
if len(c.Values) == 0 {
|
if len(c.Values) != rowsCount {
|
||||||
panic(fmt.Errorf("unexpected zero rows"))
|
panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), rowsCount))
|
||||||
}
|
}
|
||||||
for _, v := range c.Values {
|
for _, v := range c.Values {
|
||||||
if v != expectedTenantID {
|
if v != expectedTenantID {
|
||||||
|
@ -131,47 +131,47 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
if !hasTenantIDColumn {
|
if !hasTenantIDColumn {
|
||||||
panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
|
panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(len(columns[0].Values)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
|
t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
|
||||||
q := mustParseQuery(`"log message"`)
|
q := mustParseQuery(`"log message"`)
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(rowsCount))
|
||||||
}
|
}
|
||||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("matching-in-filter", func(t *testing.T) {
|
t.Run("matching-in-filter", func(t *testing.T) {
|
||||||
q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
|
q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(rowsCount))
|
||||||
}
|
}
|
||||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.Run("stream-filter-mismatch", func(_ *testing.T) {
|
t.Run("stream-filter-mismatch", func(_ *testing.T) {
|
||||||
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
|
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
|
||||||
processBlock := func(_ []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match"))
|
panic(fmt.Errorf("unexpected match for %d rows", rowsCount))
|
||||||
}
|
}
|
||||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||||
})
|
})
|
||||||
|
@ -183,15 +183,15 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
expectedStreamID := fmt.Sprintf("stream_id=%d", i)
|
expectedStreamID := fmt.Sprintf("stream_id=%d", i)
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, columns []BlockColumn) {
|
||||||
hasStreamIDColumn := false
|
hasStreamIDColumn := false
|
||||||
var columnNames []string
|
var columnNames []string
|
||||||
for _, c := range columns {
|
for _, c := range columns {
|
||||||
if c.Name == "stream-id" {
|
if c.Name == "stream-id" {
|
||||||
hasStreamIDColumn = true
|
hasStreamIDColumn = true
|
||||||
if len(c.Values) == 0 {
|
if len(c.Values) != rowsCount {
|
||||||
panic(fmt.Errorf("unexpected zero rows"))
|
panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), rowsCount))
|
||||||
}
|
}
|
||||||
for _, v := range c.Values {
|
for _, v := range c.Values {
|
||||||
if v != expectedStreamID {
|
if v != expectedStreamID {
|
||||||
|
@ -204,13 +204,13 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
if !hasStreamIDColumn {
|
if !hasStreamIDColumn {
|
||||||
panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
|
panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(len(columns[0].Values)))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream * rowsPerBlock
|
expectedRowsCount := blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows for stream %d; got %d; want %d", i, n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -221,15 +221,15 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -241,15 +241,15 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream
|
expectedRowsCount := streamsPerTenant * blocksPerStream
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -261,15 +261,15 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(columns []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
rowsCount.Add(uint32(len(columns[0].Values)))
|
rowsCountTotal.Add(uint32(rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream
|
expectedRowsCount := blocksPerStream
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -281,8 +281,8 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
processBlock := func(_ []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match"))
|
panic(fmt.Errorf("unexpected match for %d rows", rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
@ -295,8 +295,8 @@ func TestStorageRunQuery(t *testing.T) {
|
||||||
AccountID: 1,
|
AccountID: 1,
|
||||||
ProjectID: 11,
|
ProjectID: 11,
|
||||||
}
|
}
|
||||||
processBlock := func(_ []BlockColumn) {
|
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) {
|
||||||
panic(fmt.Errorf("unexpected match"))
|
panic(fmt.Errorf("unexpected match for %d rows", rowsCount))
|
||||||
}
|
}
|
||||||
tenantIDs := []TenantID{tenantID}
|
tenantIDs := []TenantID{tenantID}
|
||||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||||
|
@ -460,17 +460,17 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
if !br.streamID.tenantID.equal(&tenantID) {
|
if !br.streamID.tenantID.equal(&tenantID) {
|
||||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -484,14 +484,14 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of matching rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -525,17 +525,17 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
if !br.streamID.tenantID.equal(&tenantID) {
|
if !br.streamID.tenantID.equal(&tenantID) {
|
||||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream * rowsPerBlock
|
expectedRowsCount := blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -554,17 +554,17 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
if !br.streamID.tenantID.equal(&tenantID) {
|
if !br.streamID.tenantID.equal(&tenantID) {
|
||||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -591,17 +591,17 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
if !br.streamID.tenantID.equal(&tenantID) {
|
if !br.streamID.tenantID.equal(&tenantID) {
|
||||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||||
}
|
}
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
expectedRowsCount := streamsPerTenant * blocksPerStream * 2
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -619,14 +619,14 @@ func TestStorageSearch(t *testing.T) {
|
||||||
filter: f,
|
filter: f,
|
||||||
resultColumnNames: []string{"_msg"},
|
resultColumnNames: []string{"_msg"},
|
||||||
}
|
}
|
||||||
var rowsCount atomic.Uint32
|
var rowsCountTotal atomic.Uint32
|
||||||
processBlock := func(_ uint, br *blockResult) {
|
processBlock := func(_ uint, br *blockResult) {
|
||||||
rowsCount.Add(uint32(br.RowsCount()))
|
rowsCountTotal.Add(uint32(br.RowsCount()))
|
||||||
}
|
}
|
||||||
s.search(workersCount, so, nil, processBlock)
|
s.search(workersCount, so, nil, processBlock)
|
||||||
|
|
||||||
expectedRowsCount := blocksPerStream
|
expectedRowsCount := blocksPerStream
|
||||||
if n := rowsCount.Load(); n != uint32(expectedRowsCount) {
|
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
|
||||||
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
t.Fatalf("unexpected number of rows; got %d; want %d", n, expectedRowsCount)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue