This commit is contained in:
Aliaksandr Valialkin 2024-04-26 23:47:50 +02:00
parent bb409ad0bf
commit 640b18cd66
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 448 additions and 59 deletions

View file

@ -1,6 +1,7 @@
package logsql package logsql
import ( import (
"context"
"net/http" "net/http"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
@ -17,8 +18,8 @@ var (
"too big value for this flag may result in high memory usage since the sorting is performed in memory") "too big value for this flag may result in high memory usage since the sorting is performed in memory")
) )
// ProcessQueryRequest handles /select/logsql/query request // ProcessQueryRequest handles /select/logsql/query request.
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}, cancel func()) { func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// Extract tenantID // Extract tenantID
tenantID, err := logstorage.GetTenantIDFromRequest(r) tenantID, err := logstorage.GetTenantIDFromRequest(r)
if err != nil { if err != nil {
@ -42,14 +43,18 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
sw := getSortWriter() sw := getSortWriter()
sw.Init(w, maxSortBufferSize.IntN(), limit) sw.Init(w, maxSortBufferSize.IntN(), limit)
tenantIDs := []logstorage.TenantID{tenantID} tenantIDs := []logstorage.TenantID{tenantID}
vlstorage.RunQuery(tenantIDs, q, stopCh, func(_ uint, rowsCount int, columns []logstorage.BlockColumn) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 { if len(columns) == 0 {
return return
} }
bb := blockResultPool.Get() bb := blockResultPool.Get()
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ { for i := range timestamps {
WriteJSONRow(bb, columns, rowIdx) WriteJSONRow(bb, columns, i)
} }
if !sw.TryWrite(bb.B) { if !sw.TryWrite(bb.B) {
@ -57,7 +62,10 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
} }
blockResultPool.Put(bb) blockResultPool.Put(bb)
}) }
vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
sw.FinalFlush() sw.FinalFlush()
putSortWriter(sw) putSortWriter(sw)
} }

View file

@ -1,7 +1,6 @@
package vlselect package vlselect
import ( import (
"context"
"embed" "embed"
"flag" "flag"
"fmt" "fmt"
@ -141,15 +140,11 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
} }
} }
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
stopCh = ctxWithCancel.Done()
switch { switch {
case path == "/logsql/query": case path == "/logsql/query":
logsqlQueryRequests.Inc() logsqlQueryRequests.Inc()
httpserver.EnableCORS(w, r) httpserver.EnableCORS(w, r)
logsql.ProcessQueryRequest(w, r, stopCh, cancel) logsql.ProcessQueryRequest(ctx, w, r)
return true return true
default: default:
return false return false

View file

@ -1,6 +1,7 @@
package vlstorage package vlstorage
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"net/http" "net/http"
@ -99,9 +100,9 @@ func MustAddRows(lr *logstorage.LogRows) {
strg.MustAddRows(lr) strg.MustAddRows(lr)
} }
// RunQuery runs the given q and calls processBlock for the returned data blocks // RunQuery runs the given q and calls writeBlock for the returned data blocks
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []logstorage.BlockColumn)) { func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) {
strg.RunQuery(tenantIDs, q, stopCh, processBlock) strg.RunQuery(ctx, tenantIDs, q, writeBlock)
} }
func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {

View file

@ -2,13 +2,59 @@ package logstorage
import ( import (
"fmt" "fmt"
"slices"
"strconv"
"strings" "strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
) )
type pipe interface { type pipe interface {
// String returns string representation of the pipe.
String() string String() string
// newPipeProcessor must return new pipeProcessor for the given ppBase.
//
// workersCount is the number of goroutine workers, which will call writeBlock() method.
//
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
//
// The returned pipeProcessor may call cancel() at any time in order to stop ppBase.
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor
}
// pipeProcessor must process a single pipe.
type pipeProcessor interface {
// writeBlock must write the given block of data to the given pipeProcessor.
//
// The workerID is the id of the worker goroutine, which called the writeBlock.
// It is in the range 0 ... workersCount-1 .
//
// It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns.
writeBlock(workerID uint, timestamps []int64, columns []BlockColumn)
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor.
//
// The pipeProcessor must call ppBase.flush() and cancel(), which has been passed to newPipeProcessor, before returning from the flush.
flush()
}
type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn)
func newDefaultPipeProcessor(writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) pipeProcessor {
return defaultPipeProcessor(writeBlock)
}
func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
dpp(workerID, timestamps, columns)
}
func (dpp defaultPipeProcessor) flush() {
// Nothing to do
} }
func parsePipes(lex *lexer) ([]pipe, error) { func parsePipes(lex *lexer) ([]pipe, error) {
@ -52,6 +98,47 @@ func (fp *fieldsPipe) String() string {
return "fields " + fieldNamesString(fp.fields) return "fields " + fieldNamesString(fp.fields)
} }
func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
return &fieldsPipeProcessor{
fp: fp,
cancel: cancel,
ppBase: ppBase,
}
}
type fieldsPipeProcessor struct {
fp *fieldsPipe
cancel func()
ppBase pipeProcessor
}
func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
if slices.Contains(fpp.fp.fields, "*") || areSameBlockColumns(columns, fpp.fp.fields) {
// Fast path - there is no need in additional transformations before writing the block to ppBase.
fpp.ppBase.writeBlock(workerID, timestamps, columns)
return
}
// Slow path - construct columns for fpp.fp.fields before writing them to ppBase.
brs := getBlockRows()
cs := brs.cs
for _, f := range fpp.fp.fields {
values := getValuesForBlockColumn(columns, f, len(timestamps))
cs = append(cs, BlockColumn{
Name: f,
Values: values,
})
}
fpp.ppBase.writeBlock(workerID, timestamps, cs)
brs.cs = cs
putBlockRows(brs)
}
func (fpp *fieldsPipeProcessor) flush() {
fpp.ppBase.flush()
fpp.cancel()
}
func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
var fields []string var fields []string
for { for {
@ -87,6 +174,27 @@ type statsFunc interface {
// neededFields returns the needed fields for calculating the given stats // neededFields returns the needed fields for calculating the given stats
neededFields() []string neededFields() []string
// newStatsFuncProcessor must create new statsFuncProcessor for calculating stats for the given statsFunc.
newStatsFuncProcessor() statsFuncProcessor
}
// statsFuncProcessor must process stats for some statsFunc.
//
// All the statsFuncProcessor methods are called from a single goroutine at a time,
// so there is no need in the internal synchronization.
type statsFuncProcessor interface {
// updateStatsForAllRows must update statsFuncProcessor stats from all the rows.
updateStatsForAllRows(timestamps []int64, columns []BlockColumn)
// updateStatsForRow must update statsFuncProcessor stats from the row at rowIndex.
updateStatsForRow(timestamps []int64, columns []BlockColumn, rowIndex int)
// mergeState must merge sfp state into statsFuncProcessor state.
mergeState(sfp statsFuncProcessor)
// finalizeStats must return the collected stats from statsFuncProcessor.
finalizeStats() (name, value string)
} }
func (sp *statsPipe) String() string { func (sp *statsPipe) String() string {
@ -106,6 +214,192 @@ func (sp *statsPipe) String() string {
return s return s
} }
func (sp *statsPipe) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]statsPipeProcessorShard, workersCount)
for i := range shards {
shard := &shards[i]
shard.m = make(map[string]*statsPipeGroup)
shard.funcs = sp.funcs
}
return &statsPipeProcessor{
sp: sp,
stopCh: stopCh,
cancel: cancel,
ppBase: ppBase,
shards: shards,
}
}
type statsPipeProcessor struct {
sp *statsPipe
stopCh <-chan struct{}
cancel func()
ppBase pipeProcessor
shards []statsPipeProcessorShard
}
type statsPipeProcessorShard struct {
statsPipeProcessorShardNopad
// The padding prevents false sharing on widespread platforms with
// 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(statsPipeProcessorShardNopad{})%128]byte
}
type statsPipeProcessorShardNopad struct {
m map[string]*statsPipeGroup
funcs []statsFunc
columnIdxs []int
keyBuf []byte
}
func (shard *statsPipeProcessorShard) getStatsPipeGroup(key []byte) *statsPipeGroup {
spg := shard.m[string(key)]
if spg != nil {
return spg
}
sfps := make([]statsFuncProcessor, len(shard.funcs))
for i, f := range shard.funcs {
sfps[i] = f.newStatsFuncProcessor()
}
spg = &statsPipeGroup{
sfps: sfps,
}
shard.m[string(key)] = spg
return spg
}
type statsPipeGroup struct {
sfps []statsFuncProcessor
}
func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) {
shard := &spp.shards[workerID]
if len(spp.sp.byFields) == 0 {
// Fast path - pass all the rows to a single group
spg := shard.getStatsPipeGroup(nil)
for _, sfp := range spg.sfps {
sfp.updateStatsForAllRows(timestamps, columns)
}
return
}
// Slow path - update per-row stats
// Pre-calculate column indexes for byFields in order to speed up building group key in the loop below.
columnIdxs := shard.columnIdxs[:0]
for _, f := range spp.sp.byFields {
idx := getBlockColumnIndex(columns, f)
columnIdxs = append(columnIdxs, idx)
}
shard.columnIdxs = columnIdxs
keyBuf := shard.keyBuf
for i := range timestamps {
// Construct key for the by (...) fields
keyBuf = keyBuf[:0]
for _, idx := range columnIdxs {
v := ""
if idx >= 0 {
v = columns[idx].Values[i]
}
keyBuf = encoding.MarshalBytes(keyBuf, bytesutil.ToUnsafeBytes(v))
}
spg := shard.getStatsPipeGroup(keyBuf)
for _, sfp := range spg.sfps {
sfp.updateStatsForRow(timestamps, columns, i)
}
}
shard.keyBuf = keyBuf
}
func (spp *statsPipeProcessor) flush() {
defer func() {
spp.ppBase.flush()
spp.cancel()
}()
// Merge states across shards
shards := spp.shards
m := shards[0].m
shards = shards[1:]
for i := range shards {
shard := &shards[i]
for key, spg := range shard.m {
// shard.m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting CPU time.
select {
case <-spp.stopCh:
return
default:
}
spgBase := m[key]
if spgBase == nil {
m[key] = spg
} else {
for i, sfp := range spgBase.sfps {
sfp.mergeState(spg.sfps[i])
}
}
}
}
// Write per-group states to ppBase
byFields := spp.sp.byFields
var values []string
var columns []BlockColumn
for key, spg := range m {
// m may be quite big, so this loop can take a lot of time and CPU.
// Stop processing data as soon as stopCh is closed without wasting CPU time.
select {
case <-spp.stopCh:
return
default:
}
// Unmarshal values for byFields from key.
values = values[:0]
keyBuf := bytesutil.ToUnsafeBytes(key)
for len(keyBuf) > 0 {
tail, v, err := encoding.UnmarshalBytes(keyBuf)
if err != nil {
logger.Panicf("BUG: cannot unmarshal value from keyBuf=%q: %w", keyBuf, err)
}
values = append(values, bytesutil.ToUnsafeString(v))
keyBuf = tail
}
if len(values) != len(byFields) {
logger.Panicf("BUG: unexpected number of values decoded from keyBuf; got %d; want %d", len(values), len(byFields))
}
// construct columns for byFields
columns = columns[:0]
for i, f := range byFields {
columns = append(columns, BlockColumn{
Name: f,
Values: values[i : i+1],
})
}
// construct columns for stats functions
for _, sfp := range spg.sfps {
name, value := sfp.finalizeStats()
columns = append(columns, BlockColumn{
Name: name,
Values: []string{value},
})
}
spp.ppBase.writeBlock(0, []int64{0}, columns)
}
}
func (sp *statsPipe) neededFields() []string { func (sp *statsPipe) neededFields() []string {
var neededFields []string var neededFields []string
m := make(map[string]struct{}) m := make(map[string]struct{})
@ -184,10 +478,39 @@ func (sfc *statsFuncCount) String() string {
return "count(" + fieldNamesString(fields) + ") as " + quoteTokenIfNeeded(sfc.resultName) return "count(" + fieldNamesString(fields) + ") as " + quoteTokenIfNeeded(sfc.resultName)
} }
func (sfc *statsFuncCount) newStatsFuncProcessor() statsFuncProcessor {
return &statsFuncCountProcessor{
sfc: sfc,
}
}
func (sfc *statsFuncCount) neededFields() []string { func (sfc *statsFuncCount) neededFields() []string {
return getFieldsIgnoreStar(sfc.fields) return getFieldsIgnoreStar(sfc.fields)
} }
type statsFuncCountProcessor struct {
sfc *statsFuncCount
rowsCount uint64
}
func (sfcp *statsFuncCountProcessor) updateStatsForAllRows(timestamps []int64, _ []BlockColumn) {
sfcp.rowsCount += uint64(len(timestamps))
}
func (sfcp *statsFuncCountProcessor) updateStatsForRow(_ []int64, _ []BlockColumn, _ int) {
sfcp.rowsCount++
}
func (sfcp *statsFuncCountProcessor) mergeState(sfp statsFuncProcessor) {
src := sfp.(*statsFuncCountProcessor)
sfcp.rowsCount += src.rowsCount
}
func (sfcp *statsFuncCountProcessor) finalizeStats() (string, string) {
value := strconv.FormatUint(sfcp.rowsCount, 10)
return sfcp.sfc.resultName, value
}
func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) { func parseStatsFuncCount(lex *lexer) (*statsFuncCount, error) {
lex.nextToken() lex.nextToken()
fields, err := parseFieldNamesInParens(lex) fields, err := parseFieldNamesInParens(lex)

View file

@ -1,9 +1,11 @@
package logstorage package logstorage
import ( import (
"context"
"math" "math"
"sort" "sort"
"sync" "sync"
"sync/atomic"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
) )
@ -42,8 +44,8 @@ type searchOptions struct {
resultColumnNames []string resultColumnNames []string
} }
// RunQuery runs the given q and calls processBlock for results // RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(workerID uint, rowsCount int, columns []BlockColumn)) { func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) {
resultColumnNames := q.getResultColumnNames() resultColumnNames := q.getResultColumnNames()
so := &genericSearchOptions{ so := &genericSearchOptions{
tenantIDs: tenantIDs, tenantIDs: tenantIDs,
@ -52,6 +54,17 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
} }
workersCount := cgroup.AvailableCPUs() workersCount := cgroup.AvailableCPUs()
pp := newDefaultPipeProcessor(writeBlock)
stopCh := ctx.Done()
for i := len(q.pipes) - 1; i >= 0; i-- {
p := q.pipes[i]
ctxChild, cancel := context.WithCancel(ctx)
stopCh = ctxChild.Done()
pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp)
ctx = ctxChild
}
s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) {
brs := getBlockRows() brs := getBlockRows()
cs := brs.cs cs := brs.cs
@ -62,12 +75,13 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
Values: br.getColumnValues(i), Values: br.getColumnValues(i),
}) })
} }
rowsCount := br.RowsCount() pp.writeBlock(workerID, br.timestamps, cs)
processBlock(workerID, rowsCount, cs)
brs.cs = cs brs.cs = cs
putBlockRows(brs) putBlockRows(brs)
}) })
pp.flush()
} }
type blockRows struct { type blockRows struct {
@ -111,6 +125,53 @@ func (c *BlockColumn) reset() {
c.Values = nil c.Values = nil
} }
func areSameBlockColumns(columns []BlockColumn, columnNames []string) bool {
if len(columnNames) != len(columns) {
return false
}
for i, name := range columnNames {
if columns[i].Name != name {
return false
}
}
return true
}
func getBlockColumnIndex(columns []BlockColumn, columnName string) int {
for i, c := range columns {
if c.Name == columnName {
return i
}
}
return -1
}
func getValuesForBlockColumn(columns []BlockColumn, columnName string, rowsCount int) []string {
for _, c := range columns {
if c.Name == columnName {
return c.Values
}
}
return getEmptyStrings(rowsCount)
}
func getEmptyStrings(rowsCount int) []string {
p := emptyStrings.Load()
if p == nil {
values := make([]string, rowsCount)
emptyStrings.Store(&values)
return values
}
values := *p
if n := rowsCount - cap(values); n > 0 {
values = append(values[:cap(values)], make([]string, n)...)
emptyStrings.Store(&values)
}
return values[:rowsCount]
}
var emptyStrings atomic.Pointer[[]string]
// The number of blocks to search at once by a single worker // The number of blocks to search at once by a single worker
// //
// This number must be increased on systems with many CPU cores in order to amortize // This number must be increased on systems with many CPU cores in order to amortize

View file

@ -1,6 +1,7 @@
package logstorage package logstorage
import ( import (
"context"
"fmt" "fmt"
"regexp" "regexp"
"sync/atomic" "sync/atomic"
@ -84,11 +85,11 @@ func TestStorageRunQuery(t *testing.T) {
AccountID: 0, AccountID: 0,
ProjectID: 0, ProjectID: 0,
} }
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
}) })
t.Run("missing-message-text", func(_ *testing.T) { t.Run("missing-message-text", func(_ *testing.T) {
q := mustParseQuery(`foobar`) q := mustParseQuery(`foobar`)
@ -96,11 +97,11 @@ func TestStorageRunQuery(t *testing.T) {
AccountID: 1, AccountID: 1,
ProjectID: 11, ProjectID: 11,
} }
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
}) })
t.Run("matching-tenant-id", func(t *testing.T) { t.Run("matching-tenant-id", func(t *testing.T) {
q := mustParseQuery(`tenant.id:*`) q := mustParseQuery(`tenant.id:*`)
@ -111,14 +112,14 @@ func TestStorageRunQuery(t *testing.T) {
} }
expectedTenantID := tenantID.String() expectedTenantID := tenantID.String()
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
hasTenantIDColumn := false hasTenantIDColumn := false
var columnNames []string var columnNames []string
for _, c := range columns { for _, c := range columns {
if c.Name == "tenant.id" { if c.Name == "tenant.id" {
hasTenantIDColumn = true hasTenantIDColumn = true
if len(c.Values) != rowsCount { if len(c.Values) != len(timestamps) {
panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) panic(fmt.Errorf("unexpected number of rows in column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
} }
for _, v := range c.Values { for _, v := range c.Values {
if v != expectedTenantID { if v != expectedTenantID {
@ -131,10 +132,10 @@ func TestStorageRunQuery(t *testing.T) {
if !hasTenantIDColumn { if !hasTenantIDColumn {
panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames)) panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
} }
rowsCountTotal.Add(uint32(len(columns[0].Values))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -145,10 +146,10 @@ func TestStorageRunQuery(t *testing.T) {
t.Run("matching-multiple-tenant-ids", func(t *testing.T) { t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
q := mustParseQuery(`"log message"`) q := mustParseQuery(`"log message"`)
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(rowsCount)) rowsCountTotal.Add(uint32(len(timestamps)))
} }
s.RunQuery(allTenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -158,10 +159,10 @@ func TestStorageRunQuery(t *testing.T) {
t.Run("matching-in-filter", func(t *testing.T) { t.Run("matching-in-filter", func(t *testing.T) {
q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`) q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(rowsCount)) rowsCountTotal.Add(uint32(len(timestamps)))
} }
s.RunQuery(allTenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock expectedRowsCount := tenantsCount * streamsPerTenant * blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -170,10 +171,10 @@ func TestStorageRunQuery(t *testing.T) {
}) })
t.Run("stream-filter-mismatch", func(_ *testing.T) { t.Run("stream-filter-mismatch", func(_ *testing.T) {
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`) q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
s.RunQuery(allTenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), allTenantIDs, q, writeBlock)
}) })
t.Run("matching-stream-id", func(t *testing.T) { t.Run("matching-stream-id", func(t *testing.T) {
for i := 0; i < streamsPerTenant; i++ { for i := 0; i < streamsPerTenant; i++ {
@ -184,14 +185,14 @@ func TestStorageRunQuery(t *testing.T) {
} }
expectedStreamID := fmt.Sprintf("stream_id=%d", i) expectedStreamID := fmt.Sprintf("stream_id=%d", i)
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, columns []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, columns []BlockColumn) {
hasStreamIDColumn := false hasStreamIDColumn := false
var columnNames []string var columnNames []string
for _, c := range columns { for _, c := range columns {
if c.Name == "stream-id" { if c.Name == "stream-id" {
hasStreamIDColumn = true hasStreamIDColumn = true
if len(c.Values) != rowsCount { if len(c.Values) != len(timestamps) {
panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), rowsCount)) panic(fmt.Errorf("unexpected number of rows for column %q; got %d; want %d", c.Name, len(c.Values), len(timestamps)))
} }
for _, v := range c.Values { for _, v := range c.Values {
if v != expectedStreamID { if v != expectedStreamID {
@ -204,10 +205,10 @@ func TestStorageRunQuery(t *testing.T) {
if !hasStreamIDColumn { if !hasStreamIDColumn {
panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames)) panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
} }
rowsCountTotal.Add(uint32(len(columns[0].Values))) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream * rowsPerBlock expectedRowsCount := blocksPerStream * rowsPerBlock
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -222,11 +223,11 @@ func TestStorageRunQuery(t *testing.T) {
ProjectID: 11, ProjectID: 11,
} }
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(rowsCount)) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream * 2 expectedRowsCount := streamsPerTenant * blocksPerStream * 2
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -242,11 +243,11 @@ func TestStorageRunQuery(t *testing.T) {
ProjectID: 11, ProjectID: 11,
} }
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(rowsCount)) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
expectedRowsCount := streamsPerTenant * blocksPerStream expectedRowsCount := streamsPerTenant * blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -262,11 +263,11 @@ func TestStorageRunQuery(t *testing.T) {
ProjectID: 11, ProjectID: 11,
} }
var rowsCountTotal atomic.Uint32 var rowsCountTotal atomic.Uint32
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
rowsCountTotal.Add(uint32(rowsCount)) rowsCountTotal.Add(uint32(len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
expectedRowsCount := blocksPerStream expectedRowsCount := blocksPerStream
if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) { if n := rowsCountTotal.Load(); n != uint32(expectedRowsCount) {
@ -281,11 +282,11 @@ func TestStorageRunQuery(t *testing.T) {
AccountID: 1, AccountID: 1,
ProjectID: 11, ProjectID: 11,
} }
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
}) })
t.Run("missing-time-range", func(_ *testing.T) { t.Run("missing-time-range", func(_ *testing.T) {
minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9 minTimestamp := baseTimestamp + (rowsPerBlock+1)*1e9
@ -295,11 +296,11 @@ func TestStorageRunQuery(t *testing.T) {
AccountID: 1, AccountID: 1,
ProjectID: 11, ProjectID: 11,
} }
processBlock := func(_ uint, rowsCount int, _ []BlockColumn) { writeBlock := func(_ uint, timestamps []int64, _ []BlockColumn) {
panic(fmt.Errorf("unexpected match for %d rows", rowsCount)) panic(fmt.Errorf("unexpected match for %d rows", len(timestamps)))
} }
tenantIDs := []TenantID{tenantID} tenantIDs := []TenantID{tenantID}
s.RunQuery(tenantIDs, q, nil, processBlock) s.RunQuery(context.Background(), tenantIDs, q, writeBlock)
}) })
// Close the storage and delete its data // Close the storage and delete its data