diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 9bbdfc357..dbb12ce01 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -1,6 +1,7 @@ package logsql import ( + "context" "net/http" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" @@ -17,8 +18,8 @@ var ( "too big value for this flag may result in high memory usage since the sorting is performed in memory") ) -// ProcessQueryRequest handles /select/logsql/query request -func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}, cancel func()) { +// ProcessQueryRequest handles /select/logsql/query request. +func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { // Extract tenantID tenantID, err := logstorage.GetTenantIDFromRequest(r) if err != nil { @@ -42,14 +43,18 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s sw := getSortWriter() sw.Init(w, maxSortBufferSize.IntN(), limit) tenantIDs := []logstorage.TenantID{tenantID} - vlstorage.RunQuery(tenantIDs, q, stopCh, func(_ uint, rowsCount int, columns []logstorage.BlockColumn) { + + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 { return } bb := blockResultPool.Get() - for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { - WriteJSONRow(bb, columns, rowIdx) + for i := range timestamps { + WriteJSONRow(bb, columns, i) } if !sw.TryWrite(bb.B) { @@ -57,7 +62,10 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s } blockResultPool.Put(bb) - }) + } + + vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock) + sw.FinalFlush() putSortWriter(sw) } diff --git a/app/vlselect/main.go b/app/vlselect/main.go index 14e80dd4c..6e6289f55 100644 --- a/app/vlselect/main.go +++ b/app/vlselect/main.go @@ -1,7 +1,6 @@ package vlselect import ( - "context" "embed" "flag" "fmt" @@ -141,15 +140,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { } } - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() - stopCh = ctxWithCancel.Done() - switch { case path == "/logsql/query": logsqlQueryRequests.Inc() httpserver.EnableCORS(w, r) - logsql.ProcessQueryRequest(w, r, stopCh, cancel) + logsql.ProcessQueryRequest(ctx, w, r) return true default: return false diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index da28aca44..636368b8a 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -1,6 +1,7 @@ package vlstorage import ( + "context" "flag" "fmt" "net/http" @@ -99,9 +100,9 @@ func MustAddRows(lr *logstorage.LogRows) { strg.MustAddRows(lr) } -// RunQuery runs the given q and calls processBlock for the returned data blocks -func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []logstorage.BlockColumn)) { - strg.RunQuery(tenantIDs, q, stopCh, processBlock) +// RunQuery runs the given q and calls writeBlock for the returned data blocks +func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) { + strg.RunQuery(ctx, tenantIDs, q, writeBlock) } func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index fb027d4c2..22b8652bd 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -2,13 +2,59 @@ package logstorage import ( "fmt" + "slices" + "strconv" "strings" + "unsafe" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" ) type pipe interface { + // String returns string representation of the pipe. String() string + + // newPipeProcessor must return new pipeProcessor for the given ppBase. + // + // workersCount is the number of goroutine workers, which will call writeBlock() method. + // + // If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds. + // It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds. + // + // The returned pipeProcessor may call cancel() at any time in order to stop ppBase. + newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor +} + +// pipeProcessor must process a single pipe. +type pipeProcessor interface { + // writeBlock must write the given block of data to the given pipeProcessor. + // + // The workerID is the id of the worker goroutine, which called the writeBlock. + // It is in the range 0 ... workersCount-1 . + // + // It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns. + writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) + + // flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. + // + // The pipeProcessor must call ppBase.flush() and cancel(), which has been passed to newPipeProcessor, before returning from the flush. + flush() +} + +type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn) + +func newDefaultPipeProcessor(writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) pipeProcessor { + return defaultPipeProcessor(writeBlock) +} + +func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + dpp(workerID, timestamps, columns) +} + +func (dpp defaultPipeProcessor) flush() { + // Nothing to do } func parsePipes(lex *lexer) ([]pipe, error) { @@ -52,6 +98,47 @@ func (fp *fieldsPipe) String() string { return "fields " + fieldNamesString(fp.fields) } +func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + return &fieldsPipeProcessor{ + fp: fp, + cancel: cancel, + ppBase: ppBase, + } +} + +type fieldsPipeProcessor struct { + fp *fieldsPipe + cancel func() + ppBase pipeProcessor +} + +func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + if slices.Contains(fpp.fp.fields, "*") || areSameBlockColumns(columns, fpp.fp.fields) { + // Fast path - there is no need in additional transformations before writing the block to ppBase. + fpp.ppBase.writeBlock(workerID, timestamps, columns) + return + } + + // Slow path - construct columns for fpp.fp.fields before writing them to ppBase. + brs := getBlockRows() + cs := brs.cs + for _, f := range fpp.fp.fields { + values := getValuesForBlockColumn(columns, f, len(timestamps)) + cs = append(cs, BlockColumn{ + Name: f, + Values: values, + }) + } + fpp.ppBase.writeBlock(workerID, timestamps, cs) + brs.cs = cs + putBlockRows(brs) +} + +func (fpp *fieldsPipeProcessor) flush() { + fpp.ppBase.flush() + fpp.cancel() +} + func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { var fields []string for { @@ -87,6 +174,27 @@ type statsFunc interface { // neededFields returns the needed fields for calculating the given stats neededFields() []string + + // newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc. + newStatsFuncProcessor() statsFuncProcessor +} + +// statsFuncProcessor must process stats for some statsFunc. +// +// All the statsFuncProcessor methods are called from a single goroutine at a time, +// so there is no need in the internal synchronization. +type statsFuncProcessor interface { + // updateStatsForAllRows must update statsFuncProcessor stats from all the rows. + updateStatsForAllRows(timestamps []int64, columns []BlockColumn) + + // updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex. + updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int) + + // mergeState must merge sfp state into statsFuncProcessor state. + mergeState(sfp statsFuncProcessor) + + // finalizeStats must return the collected stats from statsFuncProcessor. + finalizeStats() (name, value string) } func (sp *statsPipe) String() string { @@ -106,6 +214,192 @@ func (sp *statsPipe) String() string { return s } +func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]statsPipeProcessorShard, workersCount) + for i := range shards { + shard := &shards[i] + shard.m = make(map[string]*statsPipeGroup) + shard.funcs = sp.funcs + } + + return &statsPipeProcessor{ + sp: sp, + stopCh: stopCh, + cancel: cancel, + ppBase: ppBase, + + shards: shards, + } +} + +type statsPipeProcessor struct { + sp *statsPipe + stopCh <-chan struct{} + cancel func() + ppBase pipeProcessor + + shards []statsPipeProcessorShard +} + +type statsPipeProcessorShard struct { + statsPipeProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with + // 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(statsPipeProcessorShardNopad{})%128]byte +} + +type statsPipeProcessorShardNopad struct { + m map[string]*statsPipeGroup + funcs []statsFunc + + columnIdxs []int + keyBuf []byte +} + +func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup { + spg := shard.m[string(key)] + if spg != nil { + return spg + } + sfps := make([]statsFuncProcessor, len(shard.funcs)) + for i, f := range shard.funcs { + sfps[i] = f.newStatsFuncProcessor() + } + spg = &statsPipeGroup{ + sfps: sfps, + } + shard.m[string(key)] = spg + return spg +} + +type statsPipeGroup struct { + sfps []statsFuncProcessor +} + +func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) { + shard := &spp.shards[workerID] + + if len(spp.sp.byFields) == 0 { + // Fast path - pass all the rows to a single group + spg := shard.getStatsPipeGroup(nil) + for _, sfp := range spg.sfps { + sfp.updateStatsForAllRows(timestamps, columns) + } + return + } + + // Slow path - update per-row stats + + // Pre-calculate column indexes for byFields in order to speed up building group key in the loop below. + columnIdxs := shard.columnIdxs[:0] + for _, f := range spp.sp.byFields { + idx := getBlockColumnIndex(columns, f) + columnIdxs = append(columnIdxs, idx) + } + shard.columnIdxs = columnIdxs + + keyBuf := shard.keyBuf + for i := range timestamps { + // Construct key for the by (...) fields + keyBuf = keyBuf[:0] + for _, idx := range columnIdxs { + v := "" + if idx >= 0 { + v = columns[idx].Values[i] + } + keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v)) + } + + spg := shard.getStatsPipeGroup(keyBuf) + for _, sfp := range spg.sfps { + sfp.updateStatsForRow(timestamps, columns, i) + } + } + shard.keyBuf = keyBuf +} + +func (spp *statsPipeProcessor) flush() { + defer func() { + spp.ppBase.flush() + spp.cancel() + }() + + // Merge states across shards + shards := spp.shards + m := shards[0].m + shards = shards[1:] + for i := range shards { + shard := &shards[i] + for key, spg := range shard.m { + // shard.m may be quite big, so this loop can take a lot of time and CPU. + // Stop processing data as soon as stopCh is closed without wasting CPU time. + select { + case <-spp.stopCh: + return + default: + } + + spgBase := m[key] + if spgBase == nil { + m[key] = spg + } else { + for i, sfp := range spgBase.sfps { + sfp.mergeState(spg.sfps[i]) + } + } + } + } + + // Write per-group states to ppBase + byFields := spp.sp.byFields + var values []string + var columns []BlockColumn + for key, spg := range m { + // m may be quite big, so this loop can take a lot of time and CPU. + // Stop processing data as soon as stopCh is closed without wasting CPU time. + select { + case <-spp.stopCh: + return + default: + } + + // Unmarshal values for byFields from key. + values = values[:0] + keyBuf := bytesutil.ToUnsafeBytes(key) + for len(keyBuf) > 0 { + tail, v, err := encoding.UnmarshalBytes(keyBuf) + if err != nil { + logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q: %w", keyBuf, err) + } + values = append(values, bytesutil.ToUnsafeString(v)) + keyBuf = tail + } + if len(values) != len(byFields) { + logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields)) + } + + // construct columns for byFields + columns = columns[:0] + for i, f := range byFields { + columns = append(columns, BlockColumn{ + Name: f, + Values: values[i : i+1], + }) + } + + // construct columns for stats functions + for _, sfp := range spg.sfps { + name, value := sfp.finalizeStats() + columns = append(columns, BlockColumn{ + Name: name, + Values: []string{value}, + }) + } + spp.ppBase.writeBlock(0, []int64{0}, columns) + } +} + func (sp *statsPipe) neededFields() []string { var neededFields []string m := make(map[string]struct{}) @@ -184,10 +478,39 @@ func (sfc *statsFuncCount) String() string { return "count(" + fieldNamesString(fields) + ") as " + quoteTokenIfNeeded(sfc.resultName) } +func (sfc *statsFuncCount) newStatsFuncProcessor() statsFuncProcessor { + return &statsFuncCountProcessor{ + sfc: sfc, + } +} + func (sfc *statsFuncCount) neededFields() []string { return getFieldsIgnoreStar(sfc.fields) } +type statsFuncCountProcessor struct { + sfc *statsFuncCount + rowsCount uint64 +} + +func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, _ []BlockColumn) { + sfcp.rowsCount += uint64(len(timestamps)) +} + +func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, _ []BlockColumn, _ int) { + sfcp.rowsCount++ +} + +func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) { + src := sfp.(*statsFuncCountProcessor) + sfcp.rowsCount += src.rowsCount +} + +func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) { + value := strconv.FormatUint(sfcp.rowsCount, 10) + return sfcp.sfc.resultName, value +} + func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { lex.nextToken() fields, err := parseFieldNamesInParens(lex) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index b0633862c..8637f86ef 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -1,9 +1,11 @@ package logstorage import ( + "context" "math" "sort" "sync" + "sync/atomic" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" ) @@ -42,8 +44,8 @@ type searchOptions struct { resultColumnNames []string } -// RunQuery runs the given q and calls processBlock for results -func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []BlockColumn)) { +// RunQuery runs the given q and calls writeBlock for results. +func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) { resultColumnNames := q.getResultColumnNames() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -52,6 +54,17 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ } workersCount := cgroup.AvailableCPUs() + + pp := newDefaultPipeProcessor(writeBlock) + stopCh := ctx.Done() + for i := len(q.pipes) - 1; i >= 0; i-- { + p := q.pipes[i] + ctxChild, cancel := context.WithCancel(ctx) + stopCh = ctxChild.Done() + pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp) + ctx = ctxChild + } + s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { brs := getBlockRows() cs := brs.cs @@ -62,12 +75,13 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{ Values: br.getColumnValues(i), }) } - rowsCount := br.RowsCount() - processBlock(workerID, rowsCount, cs) + pp.writeBlock(workerID, br.timestamps, cs) brs.cs = cs putBlockRows(brs) }) + + pp.flush() } type blockRows struct { @@ -111,6 +125,53 @@ func (c *BlockColumn) reset() { c.Values = nil } +func areSameBlockColumns(columns []BlockColumn, columnNames []string) bool { + if len(columnNames) != len(columns) { + return false + } + for i, name := range columnNames { + if columns[i].Name != name { + return false + } + } + return true +} + +func getBlockColumnIndex(columns []BlockColumn, columnName string) int { + for i, c := range columns { + if c.Name == columnName { + return i + } + } + return -1 +} + +func getValuesForBlockColumn(columns []BlockColumn, columnName string, rowsCount int) []string { + for _, c := range columns { + if c.Name == columnName { + return c.Values + } + } + return getEmptyStrings(rowsCount) +} + +func getEmptyStrings(rowsCount int) []string { + p := emptyStrings.Load() + if p == nil { + values := make([]string, rowsCount) + emptyStrings.Store(&values) + return values + } + values := *p + if n := rowsCount - cap(values); n > 0 { + values = append(values[:cap(values)], make([]string, n)...) + emptyStrings.Store(&values) + } + return values[:rowsCount] +} + +var emptyStrings atomic.Pointer[[]string] + // The number of blocks to search at once by a single worker // // This number must be increased on systems with many CPU cores in order to amortize diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 544e55cc7..49499dfdb 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -1,6 +1,7 @@ package logstorage import ( + "context" "fmt" "regexp" "sync/atomic" @@ -84,11 +85,11 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 0, ProjectID: 0, } - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) }) t.Run("missing-message-text", func(_ *testing.T) { q := mustParseQuery(`foobar`) @@ -96,11 +97,11 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) }) t.Run("matching-tenant-id", func(t *testing.T) { q := mustParseQuery(`tenant.id:*`) @@ -111,14 +112,14 @@ func TestStorageRunQuery(t *testing.T) { } expectedTenantID := tenantID.String() var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { + writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) { hasTenantIDColumn := false var columnNames []string for _, c := range columns { if c.Name == "tenant.id" { hasTenantIDColumn = true - if len(c.Values) != rowsCount { - panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) + if len(c.Values) != len(timestamps) { + panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps))) } for _, v := range c.Values { if v != expectedTenantID { @@ -131,10 +132,10 @@ func TestStorageRunQuery(t *testing.T) { if !hasTenantIDColumn { panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) } - rowsCountTotal.Add(uint32(len(columns[0].Values))) + rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -145,10 +146,10 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-multiple-tenant-ids", func(t *testing.T) { q := mustParseQuery(`"log message"`) var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - rowsCountTotal.Add(uint32(rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + rowsCountTotal.Add(uint32(len(timestamps))) } - s.RunQuery(allTenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -158,10 +159,10 @@ func TestStorageRunQuery(t *testing.T) { t.Run("matching-in-filter", func(t *testing.T) { q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - rowsCountTotal.Add(uint32(rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + rowsCountTotal.Add(uint32(len(timestamps))) } - s.RunQuery(allTenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), allTenantIDs, q, writeBlock) expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -170,10 +171,10 @@ func TestStorageRunQuery(t *testing.T) { }) t.Run("stream-filter-mismatch", func(_ *testing.T) { q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } - s.RunQuery(allTenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), allTenantIDs, q, writeBlock) }) t.Run("matching-stream-id", func(t *testing.T) { for i := 0; i < streamsPerTenant; i++ { @@ -184,14 +185,14 @@ func TestStorageRunQuery(t *testing.T) { } expectedStreamID := fmt.Sprintf("stream_id=%d", i) var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { + writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) { hasStreamIDColumn := false var columnNames []string for _, c := range columns { if c.Name == "stream-id" { hasStreamIDColumn = true - if len(c.Values) != rowsCount { - panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) + if len(c.Values) != len(timestamps) { + panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps))) } for _, v := range c.Values { if v != expectedStreamID { @@ -204,10 +205,10 @@ func TestStorageRunQuery(t *testing.T) { if !hasStreamIDColumn { panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) } - rowsCountTotal.Add(uint32(len(columns[0].Values))) + rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream * rowsPerBlock if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -222,11 +223,11 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - rowsCountTotal.Add(uint32(rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream * 2 if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -242,11 +243,11 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - rowsCountTotal.Add(uint32(rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) expectedRowsCount := streamsPerTenant * blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -262,11 +263,11 @@ func TestStorageRunQuery(t *testing.T) { ProjectID: 11, } var rowsCountTotal atomic.Uint32 - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - rowsCountTotal.Add(uint32(rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + rowsCountTotal.Add(uint32(len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) expectedRowsCount := blocksPerStream if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { @@ -281,11 +282,11 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) }) t.Run("missing-time-range", func(_ *testing.T) { minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 @@ -295,11 +296,11 @@ func TestStorageRunQuery(t *testing.T) { AccountID: 1, ProjectID: 11, } - processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { - panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) + writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) { + panic(fmt.Errorf("unexpected match for %d rows", len(timestamps))) } tenantIDs := []TenantID{tenantID} - s.RunQuery(tenantIDs, q, nil, processBlock) + s.RunQuery(context.Background(), tenantIDs, q, writeBlock) }) // Close the storage and delete its data