This commit is contained in:
Aliaksandr Valialkin 2024-05-20 00:13:03 +02:00
parent b8b6d0eca8
commit 3ed168ec8d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB

View file

@ -6,6 +6,7 @@ import (
"math" "math"
"slices" "slices"
"sort" "sort"
"strings"
"sync" "sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -73,20 +74,8 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
if err != nil { if err != nil {
return err return err
} }
return s.runQuery(ctx, tenantIDs, qNew, writeBlock)
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error { writeBlockResult := func(workerID uint, br *blockResult) {
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
filter: q.f,
neededColumnNames: neededColumnNames,
unneededColumnNames: unneededColumnNames,
needAllColumns: slices.Contains(neededColumnNames, "*"),
}
pp := newDefaultPipeProcessor(func(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 { if len(br.timestamps) == 0 {
return return
} }
@ -106,11 +95,25 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
brs.cs = csDst brs.cs = csDst
putBlockRows(brs) putBlockRows(brs)
}) }
return s.runQuery(ctx, tenantIDs, qNew, writeBlockResult)
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
filter: q.f,
neededColumnNames: neededColumnNames,
unneededColumnNames: unneededColumnNames,
needAllColumns: slices.Contains(neededColumnNames, "*"),
}
workersCount := cgroup.AvailableCPUs() workersCount := cgroup.AvailableCPUs()
ppMain := pp ppMain := newDefaultPipeProcessor(writeBlockResultFunc)
pp := ppMain
stopCh := ctx.Done() stopCh := ctx.Done()
cancels := make([]func(), len(q.pipes)) cancels := make([]func(), len(q.pipes))
pps := make([]pipeProcessor, len(q.pipes)) pps := make([]pipeProcessor, len(q.pipes))
@ -192,16 +195,28 @@ func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool {
func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) {
var values []string var values []string
var valuesLock sync.Mutex var valuesLock sync.Mutex
writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) { writeBlockResult := func(workerID uint, br *blockResult) {
if len(columns) != 1 { if len(br.timestamps) == 0 {
logger.Panicf("BUG: expecting only a single column; got %d columns", len(columns)) return
} }
cs := br.getColumns()
if len(cs) != 1 {
logger.Panicf("BUG: expecting only a single column; got %d columns", len(cs))
}
columnValues := cs[0].getValues(br)
columnValuesCopy := make([]string, len(columnValues))
for i, v := range columnValues {
columnValuesCopy[i] = strings.Clone(v)
}
valuesLock.Lock() valuesLock.Lock()
values = append(values, columns[0].Values...) values = append(values, columnValuesCopy...)
valuesLock.Unlock() valuesLock.Unlock()
} }
err := s.runQuery(ctx, tenantIDs, q, writeBlock) err := s.runQuery(ctx, tenantIDs, q, writeBlockResult)
if err != nil { if err != nil {
return nil, err return nil, err
} }