This commit is contained in:
Aliaksandr Valialkin 2024-04-27 22:08:03 +02:00
parent f9d0b21bb9
commit 75914210ec
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 54 additions and 34 deletions

View file

@ -64,10 +64,15 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
blockResultPool.Put(bb)
}
vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
sw.FinalFlush()
putSortWriter(sw)
if err != nil {
httpserver.Errorf(w, r, "cannot execute query: %s", err)
}
}
var blockResultPool bytesutil.ByteBufferPool

View file

@ -101,8 +101,8 @@ func MustAddRows(lr *logstorage.LogRows) {
}
// RunQuery runs the given q and calls writeBlock for the returned data blocks
func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) {
strg.RunQuery(ctx, tenantIDs, q, writeBlock)
func RunQuery(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, writeBlock func(workerID uint, timestamps []int64, columns []logstorage.BlockColumn)) error {
return strg.RunQuery(ctx, tenantIDs, q, writeBlock)
}
func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {

View file

@ -24,7 +24,7 @@ type pipe interface {
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
//
// The returned pipeProcessor may call cancel() at any time in order to notify writeBlock callers that it doesn't accept more data.
// The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor.
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor
}
@ -32,16 +32,24 @@ type pipe interface {
type pipeProcessor interface {
// writeBlock must write the given block of data to the given pipeProcessor.
//
// The workerID is the id of the worker goroutine, which called the writeBlock.
// writeBlock is called concurrently from worker goroutines.
// The workerID is the id of the worker goroutine, which calls the writeBlock.
// It is in the range 0 ... workersCount-1 .
//
// It is forbidden to hold references to columns after returning from writeBlock, since the caller re-uses columns.
//
// If any error occurs at writeBlock, then cancel() must be called by pipeProcessor in order to notify worker goroutines
// to stop sending new data. The occurred error must be returned from flush().
//
// cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error.
writeBlock(workerID uint, timestamps []int64, columns []BlockColumn)
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor.
//
// The pipeProcessor must call cancel() and ppBase.flush(), which has been passed to newPipeProcessor, before returning from the flush.
flush()
// flush is called after all the worker goroutines are stopped.
//
// It is guaranteed that flush() is called for every pipeProcessor returned from pipe.newPipeProcessor().
flush() error
}
type defaultPipeProcessor func(workerID uint, timestamps []int64, columns []BlockColumn)
@ -54,8 +62,8 @@ func (dpp defaultPipeProcessor) writeBlock(workerID uint, timestamps []int64, co
dpp(workerID, timestamps, columns)
}
func (dpp defaultPipeProcessor) flush() {
// Nothing to do
func (dpp defaultPipeProcessor) flush() error {
return nil
}
func parsePipes(lex *lexer) ([]pipe, error) {
@ -111,17 +119,15 @@ func (fp *fieldsPipe) String() string {
return "fields " + fieldNamesString(fp.fields)
}
func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
func (fp *fieldsPipe) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &fieldsPipeProcessor{
fp: fp,
cancel: cancel,
ppBase: ppBase,
}
}
type fieldsPipeProcessor struct {
fp *fieldsPipe
cancel func()
ppBase pipeProcessor
}
@ -147,9 +153,8 @@ func (fpp *fieldsPipeProcessor) writeBlock(workerID uint, timestamps []int64, co
putBlockRows(brs)
}
func (fpp *fieldsPipeProcessor) flush() {
fpp.cancel()
fpp.ppBase.flush()
func (fpp *fieldsPipeProcessor) flush() error {
return nil
}
func parseFieldsPipe(lex *lexer) (*fieldsPipe, error) {
@ -330,12 +335,7 @@ func (spp *statsPipeProcessor) writeBlock(workerID uint, timestamps []int64, col
shard.keyBuf = keyBuf
}
func (spp *statsPipeProcessor) flush() {
defer func() {
spp.cancel()
spp.ppBase.flush()
}()
func (spp *statsPipeProcessor) flush() error {
// Merge states across shards
shards := spp.shards
m := shards[0].m
@ -347,7 +347,7 @@ func (spp *statsPipeProcessor) flush() {
// Stop processing data as soon as stopCh is closed without wasting CPU time.
select {
case <-spp.stopCh:
return
return nil
default:
}
@ -377,7 +377,7 @@ func (spp *statsPipeProcessor) flush() {
// Stop processing data as soon as stopCh is closed without wasting CPU time.
select {
case <-spp.stopCh:
return
return nil
default:
}
@ -415,6 +415,8 @@ func (spp *statsPipeProcessor) flush() {
}
spp.ppBase.writeBlock(0, []int64{0}, columns)
}
return nil
}
func (sp *statsPipe) neededFields() []string {
@ -820,9 +822,8 @@ func (hpp *headPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu
hpp.cancel()
}
func (hpp *headPipeProcessor) flush() {
hpp.cancel()
hpp.ppBase.flush()
func (hpp *headPipeProcessor) flush() error {
return nil
}
func parseHeadPipe(lex *lexer) (*headPipe, error) {
@ -848,17 +849,15 @@ func (sp *skipPipe) String() string {
return fmt.Sprintf("skip %d", sp.n)
}
func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor {
func (sp *skipPipe) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
return &skipPipeProcessor{
sp: sp,
cancel: cancel,
ppBase: ppBase,
}
}
type skipPipeProcessor struct {
sp *skipPipe
cancel func()
ppBase pipeProcessor
rowsSkipped atomic.Uint64
@ -887,9 +886,8 @@ func (spp *skipPipeProcessor) writeBlock(workerID uint, timestamps []int64, colu
spp.ppBase.writeBlock(workerID, timestamps, cs)
}
func (spp *skipPipeProcessor) flush() {
spp.cancel()
spp.ppBase.flush()
func (spp *skipPipeProcessor) flush() error {
return nil
}
func parseSkipPipe(lex *lexer) (*skipPipe, error) {

View file

@ -45,7 +45,7 @@ type searchOptions struct {
}
// RunQuery runs the given q and calls writeBlock for results.
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) {
func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlock func(workerID uint, timestamps []int64, columns []BlockColumn)) error {
resultColumnNames := q.getResultColumnNames()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
@ -56,13 +56,19 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
workersCount := cgroup.AvailableCPUs()
pp := newDefaultPipeProcessor(writeBlock)
ppMain := pp
stopCh := ctx.Done()
cancels := make([]func(), len(q.pipes))
pps := make([]pipeProcessor, len(q.pipes))
for i := len(q.pipes) - 1; i >= 0; i-- {
p := q.pipes[i]
ctxChild, cancel := context.WithCancel(ctx)
stopCh = ctxChild.Done()
pp = p.newPipeProcessor(workersCount, stopCh, cancel, pp)
ctx = ctxChild
cancels[i] = cancel
pps[i] = pp
}
s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) {
@ -81,7 +87,18 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
putBlockRows(brs)
})
pp.flush()
var errFlush error
for i, pp := range pps {
if err := pp.flush(); err != nil && errFlush == nil {
errFlush = err
}
cancel := cancels[i]
cancel()
}
if err := ppMain.flush(); err != nil && errFlush == nil {
errFlush = err
}
return errFlush
}
type blockRows struct {