From 75914210ec1295512c4b6379cf50141b89b9a575 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 27 Apr 2024 22:08:03 +0200 Subject: [PATCH] wip --- app/vlselect/logsql/logsql.go | 7 +++- app/vlstorage/main.go | 4 +-- lib/logstorage/pipes.go | 56 +++++++++++++++----------------- lib/logstorage/storage_search.go | 21 ++++++++++-- 4 files changed, 54 insertions(+), 34 deletions(-) diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index dbb12ce01..f895693b9 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -64,10 +64,15 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req blockResultPool.Put(bb) } - vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock) + err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock) sw.FinalFlush() putSortWriter(sw) + + if err != nil { + httpserver.Errorf(w, r, "cannot execute query: %s", err) + } + } var blockResultPool bytesutil.ByteBufferPool diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index 636368b8a..6fb3df74e 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -101,8 +101,8 @@ func MustAddRows(lr *logstorage.LogRows) { } // RunQuery runs the given q and calls writeBlock for the returned data blocks -func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) { - strg.RunQuery(ctx, tenantIDs, q, writeBlock) +func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) error { + return strg.RunQuery(ctx, tenantIDs, q, writeBlock) } func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { diff --git a/lib/logstorage/pipes.go b/lib/logstorage/pipes.go index 6afc417b0..03e6eecf6 100644 --- a/lib/logstorage/pipes.go +++ b/lib/logstorage/pipes.go @@ -24,7 +24,7 @@ type pipe interface { // If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds. // It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds. // - // The returned pipeProcessor may call cancel() at any time in order to notify writeBlock callers that it doesn't accept more data. + // The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor. newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor } @@ -32,16 +32,24 @@ type pipe interface { type pipeProcessor interface { // writeBlock must write the given block of data to the given pipeProcessor. // - // The workerID is the id of the worker goroutine, which called the writeBlock. + // writeBlock is called concurrently from worker goroutines. + // The workerID is the id of the worker goroutine, which calls the writeBlock. // It is in the range 0 ... workersCount-1 . // // It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns. + // + // If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines + // to stop sending new data. The occurred error must be returned from flush(). + // + // cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error. writeBlock(workerID uint, timestamps []int64, columns []BlockColumn) // flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. // - // The pipeProcessor must call cancel() and ppBase.flush(), which has been passed to newPipeProcessor, before returning from the flush. - flush() + // flush is called after all the worker goroutines are stopped. + // + // It is guaranteed that flush() is called for every pipeProcessor returned from pipe.newPipeProcessor(). + flush() error } type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn) @@ -54,8 +62,8 @@ func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, co dpp(workerID, timestamps, columns) } -func (dpp defaultPipeProcessor) flush() { - // Nothing to do +func (dpp defaultPipeProcessor) flush() error { + return nil } func parsePipes(lex *lexer) ([]pipe, error) { @@ -111,17 +119,15 @@ func (fp *fieldsPipe) String() string { return "fields " + fieldNamesString(fp.fields) } -func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &fieldsPipeProcessor{ fp: fp, - cancel: cancel, ppBase: ppBase, } } type fieldsPipeProcessor struct { fp *fieldsPipe - cancel func() ppBase pipeProcessor } @@ -147,9 +153,8 @@ func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, co putBlockRows(brs) } -func (fpp *fieldsPipeProcessor) flush() { - fpp.cancel() - fpp.ppBase.flush() +func (fpp *fieldsPipeProcessor) flush() error { + return nil } func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) { @@ -330,12 +335,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col shard.keyBuf = keyBuf } -func (spp *statsPipeProcessor) flush() { - defer func() { - spp.cancel() - spp.ppBase.flush() - }() - +func (spp *statsPipeProcessor) flush() error { // Merge states across shards shards := spp.shards m := shards[0].m @@ -347,7 +347,7 @@ func (spp *statsPipeProcessor) flush() { // Stop processing data as soon as stopCh is closed without wasting CPU time. select { case <-spp.stopCh: - return + return nil default: } @@ -377,7 +377,7 @@ func (spp *statsPipeProcessor) flush() { // Stop processing data as soon as stopCh is closed without wasting CPU time. select { case <-spp.stopCh: - return + return nil default: } @@ -415,6 +415,8 @@ func (spp *statsPipeProcessor) flush() { } spp.ppBase.writeBlock(0, []int64{0}, columns) } + + return nil } func (sp *statsPipe) neededFields() []string { @@ -820,9 +822,8 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu hpp.cancel() } -func (hpp *headPipeProcessor) flush() { - hpp.cancel() - hpp.ppBase.flush() +func (hpp *headPipeProcessor) flush() error { + return nil } func parseHeadPipe(lex *lexer) (*headPipe, error) { @@ -848,17 +849,15 @@ func (sp *skipPipe) String() string { return fmt.Sprintf("skip %d", sp.n) } -func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { return &skipPipeProcessor{ sp: sp, - cancel: cancel, ppBase: ppBase, } } type skipPipeProcessor struct { sp *skipPipe - cancel func() ppBase pipeProcessor rowsSkipped atomic.Uint64 @@ -887,9 +886,8 @@ func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu spp.ppBase.writeBlock(workerID, timestamps, cs) } -func (spp *skipPipeProcessor) flush() { - spp.cancel() - spp.ppBase.flush() +func (spp *skipPipeProcessor) flush() error { + return nil } func parseSkipPipe(lex *lexer) (*skipPipe, error) { diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go index 8637f86ef..f96bb64ec 100644 --- a/lib/logstorage/storage_search.go +++ b/lib/logstorage/storage_search.go @@ -45,7 +45,7 @@ type searchOptions struct { } // RunQuery runs the given q and calls writeBlock for results. -func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) { +func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error { resultColumnNames := q.getResultColumnNames() so := &genericSearchOptions{ tenantIDs: tenantIDs, @@ -56,13 +56,19 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, workersCount := cgroup.AvailableCPUs() pp := newDefaultPipeProcessor(writeBlock) + ppMain := pp stopCh := ctx.Done() + cancels := make([]func(), len(q.pipes)) + pps := make([]pipeProcessor, len(q.pipes)) for i := len(q.pipes) - 1; i >= 0; i-- { p := q.pipes[i] ctxChild, cancel := context.WithCancel(ctx) stopCh = ctxChild.Done() pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp) ctx = ctxChild + + cancels[i] = cancel + pps[i] = pp } s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) { @@ -81,7 +87,18 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, putBlockRows(brs) }) - pp.flush() + var errFlush error + for i, pp := range pps { + if err := pp.flush(); err != nil && errFlush == nil { + errFlush = err + } + cancel := cancels[i] + cancel() + } + if err := ppMain.flush(); err != nil && errFlush == nil { + errFlush = err + } + return errFlush } type blockRows struct {