From 3ed168ec8d38273eb242f4c85efba74bd4397d5c Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 20 May 2024 00:13:03 +0200 Subject: [PATCH] wip --- lib/logstorage/storage_search.go | 55 ++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 6ed305ecaa..51664d481e 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -6,6 +6,7 @@ import ( "math" "slices" "sort" + "strings" "sync" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" @@ -73,20 +74,8 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, if err != nil { return err } - return s.runQuery(ctx, tenantIDs, qNew, writeBlock) -} -func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock WriteBlockFunc) error { - neededColumnNames, unneededColumnNames := q.getNeededColumns() - so := &genericSearchOptions{ - tenantIDs: tenantIDs, - filter: q.f, - neededColumnNames: neededColumnNames, - unneededColumnNames: unneededColumnNames, - needAllColumns: slices.Contains(neededColumnNames, "*"), - } - - pp := newDefaultPipeProcessor(func(workerID uint, br *blockResult) { + writeBlockResult := func(workerID uint, br *blockResult) { if len(br.timestamps) == 0 { return } @@ -106,11 +95,25 @@ func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, brs.cs = csDst putBlockRows(brs) - }) + } + + return s.runQuery(ctx, tenantIDs, qNew, writeBlockResult) +} + +func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error { + neededColumnNames, unneededColumnNames := q.getNeededColumns() + so := &genericSearchOptions{ + tenantIDs: tenantIDs, + filter: q.f, + neededColumnNames: neededColumnNames, + unneededColumnNames: unneededColumnNames, + needAllColumns: slices.Contains(neededColumnNames, "*"), + } workersCount := cgroup.AvailableCPUs() - ppMain := pp + ppMain := newDefaultPipeProcessor(writeBlockResultFunc) + pp := ppMain stopCh := ctx.Done() cancels := make([]func(), len(q.pipes)) pps := make([]pipeProcessor, len(q.pipes)) @@ -192,16 +195,28 @@ func endsWithPipeUniqSingleField(pipes []pipe, fieldName string) bool { func (s *Storage) runSingleColumnQuery(ctx context.Context, tenantIDs []TenantID, q *Query) ([]string, error) { var values []string var valuesLock sync.Mutex - writeBlock := func(workerID uint, timestamps []int64, columns []BlockColumn) { - if len(columns) != 1 { - logger.Panicf("BUG: expecting only a single column; got %d columns", len(columns)) + writeBlockResult := func(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return } + + cs := br.getColumns() + if len(cs) != 1 { + logger.Panicf("BUG: expecting only a single column; got %d columns", len(cs)) + } + columnValues := cs[0].getValues(br) + + columnValuesCopy := make([]string, len(columnValues)) + for i, v := range columnValues { + columnValuesCopy[i] = strings.Clone(v) + } + valuesLock.Lock() - values = append(values, columns[0].Values...) + values = append(values, columnValuesCopy...) valuesLock.Unlock() } - err := s.runQuery(ctx, tenantIDs, q, writeBlock) + err := s.runQuery(ctx, tenantIDs, q, writeBlockResult) if err != nil { return nil, err }