From 1416b5f813953e6df246eb57a7df2b18c0c7c624 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 25 May 2024 20:13:01 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe.go | 8 +- lib/logstorage/pipe_copy.go | 8 +- lib/logstorage/pipe_delete.go | 8 +- lib/logstorage/pipe_extract.go | 10 +- lib/logstorage/pipe_field_names.go | 10 +- lib/logstorage/pipe_fields.go | 8 +- lib/logstorage/pipe_filter.go | 14 +- lib/logstorage/pipe_format.go | 10 +- lib/logstorage/pipe_limit.go | 12 +- lib/logstorage/pipe_offset.go | 10 +- lib/logstorage/pipe_pack_json.go | 8 +- lib/logstorage/pipe_rename.go | 8 +- lib/logstorage/pipe_replace.go | 4 +- lib/logstorage/pipe_replace_regexp.go | 4 +- lib/logstorage/pipe_sort.go | 18 +- lib/logstorage/pipe_stats.go | 12 +- lib/logstorage/pipe_topk.go | 12 +- lib/logstorage/pipe_uniq.go | 12 +- lib/logstorage/pipe_unpack.go | 24 +-- lib/logstorage/pipe_unpack_json.go | 4 +- lib/logstorage/pipe_unpack_json_test.go | 219 ----------------------- lib/logstorage/pipe_unpack_logfmt.go | 4 +- lib/logstorage/pipe_unroll.go | 10 +- lib/logstorage/pipe_update.go | 10 +- lib/logstorage/pipe_utils_test.go | 224 ++++++++++++++++++++++++ 25 files changed, 338 insertions(+), 333 deletions(-) create mode 100644 lib/logstorage/pipe_utils_test.go diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 0806063b9..d01fdc4be 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -11,15 +11,15 @@ type pipe interface { // updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input. updateNeededFields(neededFields, unneededFields fieldsSet) - // newPipeProcessor must return new pipeProcessor for the given ppBase. + // newPipeProcessor must return new pipeProcessor, which writes data to the given ppNext. // // workersCount is the number of goroutine workers, which will call writeBlock() method. // // If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds. // It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds. // - // The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor. - newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor + // The returned pipeProcessor may call cancel() at any time in order to notify the caller to stop sending new data to it. + newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor // optimize must optimize the pipe optimize() @@ -50,7 +50,7 @@ type pipeProcessor interface { // cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error. writeBlock(workerID uint, br *blockResult) - // flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. + // flush must flush all the data accumulated in the pipeProcessor to the next pipeProcessor. // // flush is called after all the worker goroutines are stopped. // diff --git a/lib/logstorage/pipe_copy.go b/lib/logstorage/pipe_copy.go index 858cba747..340b1d97f 100644 --- a/lib/logstorage/pipe_copy.go +++ b/lib/logstorage/pipe_copy.go @@ -62,16 +62,16 @@ func (pc *pipeCopy) initFilterInValues(cache map[string][]string, getFieldValues return pc, nil } -func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeCopyProcessor{ pc: pc, - ppBase: ppBase, + ppNext: ppNext, } } type pipeCopyProcessor struct { pc *pipeCopy - ppBase pipeProcessor + ppNext pipeProcessor } func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { @@ -80,7 +80,7 @@ func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { } br.copyColumns(pcp.pc.srcFields, pcp.pc.dstFields) - pcp.ppBase.writeBlock(workerID, br) + pcp.ppNext.writeBlock(workerID, br) } func (pcp *pipeCopyProcessor) flush() error { diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go index d329482de..0bf861076 100644 --- a/lib/logstorage/pipe_delete.go +++ b/lib/logstorage/pipe_delete.go @@ -44,16 +44,16 @@ func (pd *pipeDelete) initFilterInValues(cache map[string][]string, getFieldValu return pd, nil } -func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeDeleteProcessor{ pd: pd, - ppBase: ppBase, + ppNext: ppNext, } } type pipeDeleteProcessor struct { pd *pipeDelete - ppBase pipeProcessor + ppNext pipeProcessor } func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { @@ -62,7 +62,7 @@ func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { } br.deleteColumns(pdp.pd.fields) - pdp.ppBase.writeBlock(workerID, br) + pdp.ppNext.writeBlock(workerID, br) } func (pdp *pipeDeleteProcessor) flush() error { diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 31bfcbd4b..e5c950592 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -101,10 +101,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } } -func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeExtractProcessor{ pe: pe, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipeExtractProcessorShard, workersCount), } @@ -112,7 +112,7 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f type pipeExtractProcessor struct { pe *pipeExtract - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeExtractProcessorShard } @@ -149,7 +149,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { if iff := pe.iff; iff != nil { iff.f.applyToBlockResult(br, bm) if bm.isZero() { - pep.ppBase.writeBlock(workerID, br) + pep.ppNext.writeBlock(workerID, br) return } } @@ -214,7 +214,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { for i := range rcs { br.addResultColumn(&rcs[i]) } - pep.ppBase.writeBlock(workerID, br) + pep.ppNext.writeBlock(workerID, br) for i := range rcs { rcs[i].reset() diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 63eff0ab0..3d4faf31a 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -49,13 +49,13 @@ func (pf *pipeFieldNames) initFilterInValues(cache map[string][]string, getField return pf, nil } -func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { shards := make([]pipeFieldNamesProcessorShard, workersCount) pfp := &pipeFieldNamesProcessor{ pf: pf, stopCh: stopCh, - ppBase: ppBase, + ppNext: ppNext, shards: shards, } @@ -65,7 +65,7 @@ func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struc type pipeFieldNamesProcessor struct { pf *pipeFieldNames stopCh <-chan struct{} - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeFieldNamesProcessorShard } @@ -184,10 +184,10 @@ func (wctx *pipeFieldNamesWriteContext) flush() { wctx.valuesLen = 0 - // Flush rcs to ppBase + // Flush rcs to ppNext br.setResultColumns(wctx.rcs[:], wctx.rowsCount) wctx.rowsCount = 0 - wctx.pfp.ppBase.writeBlock(0, br) + wctx.pfp.ppNext.writeBlock(0, br) br.reset() wctx.rcs[0].resetValues() wctx.rcs[1].resetValues() diff --git a/lib/logstorage/pipe_fields.go b/lib/logstorage/pipe_fields.go index 0fa12212f..f0fb5873a 100644 --- a/lib/logstorage/pipe_fields.go +++ b/lib/logstorage/pipe_fields.go @@ -61,16 +61,16 @@ func (pf *pipeFields) initFilterInValues(cache map[string][]string, getFieldValu return pf, nil } -func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeFieldsProcessor{ pf: pf, - ppBase: ppBase, + ppNext: ppNext, } } type pipeFieldsProcessor struct { pf *pipeFields - ppBase pipeProcessor + ppNext pipeProcessor } func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { @@ -81,7 +81,7 @@ func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { if !pfp.pf.containsStar { br.setColumns(pfp.pf.fields) } - pfp.ppBase.writeBlock(workerID, br) + pfp.ppNext.writeBlock(workerID, br) } func (pfp *pipeFieldsProcessor) flush() error { diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go index 050901ec8..ffc0b3f6c 100644 --- a/lib/logstorage/pipe_filter.go +++ b/lib/logstorage/pipe_filter.go @@ -47,12 +47,12 @@ func (pf *pipeFilter) initFilterInValues(cache map[string][]string, getFieldValu return &pfNew, nil } -func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { shards := make([]pipeFilterProcessorShard, workersCount) pfp := &pipeFilterProcessor{ pf: pf, - ppBase: ppBase, + ppNext: ppNext, shards: shards, } @@ -61,7 +61,7 @@ func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu type pipeFilterProcessor struct { pf *pipeFilter - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeFilterProcessorShard } @@ -90,8 +90,8 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { bm.setBits() pfp.pf.f.applyToBlockResult(br, bm) if bm.areAllBitsSet() { - // Fast path - the filter didn't filter out anything - send br to the base pipe as is. - pfp.ppBase.writeBlock(workerID, br) + // Fast path - the filter didn't filter out anything - send br to the next pipe as is. + pfp.ppNext.writeBlock(workerID, br) return } if bm.isZero() { @@ -99,9 +99,9 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) { return } - // Slow path - copy the remaining rows from br to shard.br before sending them to base pipe. + // Slow path - copy the remaining rows from br to shard.br before sending them to the next pipe. shard.br.initFromFilterAllColumns(br, bm) - pfp.ppBase.writeBlock(workerID, &shard.br) + pfp.ppNext.writeBlock(workerID, &shard.br) } func (pfp *pipeFilterProcessor) flush() error { diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index 76cf74209..de580110e 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -90,10 +90,10 @@ func (pf *pipeFormat) initFilterInValues(cache map[string][]string, getFieldValu return &pfNew, nil } -func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeFormatProcessor{ pf: pf, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipeFormatProcessorShard, workersCount), } @@ -101,7 +101,7 @@ func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu type pipeFormatProcessor struct { pf *pipeFormat - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeFormatProcessorShard } @@ -134,7 +134,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { if iff := pf.iff; iff != nil { iff.f.applyToBlockResult(br, bm) if bm.isZero() { - pfp.ppBase.writeBlock(workerID, br) + pfp.ppNext.writeBlock(workerID, br) return } } @@ -158,7 +158,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) { } br.addResultColumn(&shard.rc) - pfp.ppBase.writeBlock(workerID, br) + pfp.ppNext.writeBlock(workerID, br) shard.a.reset() shard.rc.reset() diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go index 5e5d0692c..2f4054393 100644 --- a/lib/logstorage/pipe_limit.go +++ b/lib/logstorage/pipe_limit.go @@ -32,7 +32,7 @@ func (pl *pipeLimit) initFilterInValues(cache map[string][]string, getFieldValue return pl, nil } -func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { if pl.limit == 0 { // Special case - notify the caller to stop writing data to the returned pipeLimitProcessor cancel() @@ -40,14 +40,14 @@ func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), p return &pipeLimitProcessor{ pl: pl, cancel: cancel, - ppBase: ppBase, + ppNext: ppNext, } } type pipeLimitProcessor struct { pl *pipeLimit cancel func() - ppBase pipeProcessor + ppNext pipeProcessor rowsProcessed atomic.Uint64 } @@ -59,8 +59,8 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) if rowsProcessed <= plp.pl.limit { - // Fast path - write all the rows to ppBase. - plp.ppBase.writeBlock(workerID, br) + // Fast path - write all the rows to ppNext. + plp.ppNext.writeBlock(workerID, br) return } @@ -74,7 +74,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) { // Write remaining rows. keepRows := plp.pl.limit - rowsProcessed br.truncateRows(int(keepRows)) - plp.ppBase.writeBlock(workerID, br) + plp.ppNext.writeBlock(workerID, br) // Notify the caller that it should stop passing more data to writeBlock(). plp.cancel() diff --git a/lib/logstorage/pipe_offset.go b/lib/logstorage/pipe_offset.go index f6fd2aa1c..fc0363d6a 100644 --- a/lib/logstorage/pipe_offset.go +++ b/lib/logstorage/pipe_offset.go @@ -32,16 +32,16 @@ func (po *pipeOffset) initFilterInValues(cache map[string][]string, getFieldValu return po, nil } -func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeOffsetProcessor{ po: po, - ppBase: ppBase, + ppNext: ppNext, } } type pipeOffsetProcessor struct { po *pipeOffset - ppBase pipeProcessor + ppNext pipeProcessor rowsProcessed atomic.Uint64 } @@ -58,13 +58,13 @@ func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) { rowsProcessed -= uint64(len(br.timestamps)) if rowsProcessed >= pop.po.offset { - pop.ppBase.writeBlock(workerID, br) + pop.ppNext.writeBlock(workerID, br) return } rowsSkip := pop.po.offset - rowsProcessed br.skipRows(int(rowsSkip)) - pop.ppBase.writeBlock(workerID, br) + pop.ppNext.writeBlock(workerID, br) } func (pop *pipeOffsetProcessor) flush() error { diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index fb6988a1d..1f3e6191b 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -46,10 +46,10 @@ func (pp *pipePackJSON) initFilterInValues(cache map[string][]string, getFieldVa return pp, nil } -func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipePackJSONProcessor{ pp: pp, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipePackJSONProcessorShard, workersCount), } @@ -57,7 +57,7 @@ func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ type pipePackJSONProcessor struct { pp *pipePackJSON - ppBase pipeProcessor + ppNext pipeProcessor shards []pipePackJSONProcessorShard } @@ -106,7 +106,7 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { } br.addResultColumn(&shard.rc) - ppp.ppBase.writeBlock(workerID, br) + ppp.ppNext.writeBlock(workerID, br) shard.rc.reset() } diff --git a/lib/logstorage/pipe_rename.go b/lib/logstorage/pipe_rename.go index a99cb84bd..44ded34ac 100644 --- a/lib/logstorage/pipe_rename.go +++ b/lib/logstorage/pipe_rename.go @@ -66,16 +66,16 @@ func (pr *pipeRename) initFilterInValues(cache map[string][]string, getFieldValu return pr, nil } -func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeRenameProcessor{ pr: pr, - ppBase: ppBase, + ppNext: ppNext, } } type pipeRenameProcessor struct { pr *pipeRename - ppBase pipeProcessor + ppNext pipeProcessor } func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { @@ -84,7 +84,7 @@ func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { } br.renameColumns(prp.pr.srcFields, prp.pr.dstFields) - prp.ppBase.writeBlock(workerID, br) + prp.ppNext.writeBlock(workerID, br) } func (prp *pipeRenameProcessor) flush() error { diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go index f20de6b75..13d66c5a8 100644 --- a/lib/logstorage/pipe_replace.go +++ b/lib/logstorage/pipe_replace.go @@ -57,7 +57,7 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal return &peNew, nil } -func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { bb := bbPool.Get() bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) @@ -66,7 +66,7 @@ func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f return result } - return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) + return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) } func parsePipeReplace(lex *lexer) (*pipeReplace, error) { diff --git a/lib/logstorage/pipe_replace_regexp.go b/lib/logstorage/pipe_replace_regexp.go index 053dbd0fb..24aa5418c 100644 --- a/lib/logstorage/pipe_replace_regexp.go +++ b/lib/logstorage/pipe_replace_regexp.go @@ -57,7 +57,7 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi return &peNew, nil } -func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { bb := bbPool.Get() bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit) @@ -66,7 +66,7 @@ func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{ return result } - return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) + return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) } diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 6150b31cf..da87c7a83 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -79,14 +79,14 @@ func (ps *pipeSort) initFilterInValues(cache map[string][]string, getFieldValues return ps, nil } -func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { if ps.limit > 0 { - return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase) + return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppNext) } - return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppBase) + return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppNext) } -func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeSortProcessorShard, workersCount) @@ -104,7 +104,7 @@ func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{} ps: ps, stopCh: stopCh, cancel: cancel, - ppBase: ppBase, + ppNext: ppNext, shards: shards, @@ -119,7 +119,7 @@ type pipeSortProcessor struct { ps *pipeSort stopCh <-chan struct{} cancel func() - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeSortProcessorShard @@ -534,7 +534,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { } } if !areEqualColumns { - // send the current block to ppBase and construct a block with new set of columns + // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] @@ -573,10 +573,10 @@ func (wctx *pipeSortWriteContext) flush() { wctx.valuesLen = 0 - // Flush rcs to ppBase + // Flush rcs to ppNext br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.psp.ppBase.writeBlock(0, br) + wctx.psp.ppNext.writeBlock(0, br) br.reset() for i := range rcs { rcs[i].resetValues() diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index f83235ff2..9e2a13102 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -148,7 +148,7 @@ func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValue const stateSizeBudgetChunk = 1 << 20 -func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.3) shards := make([]pipeStatsProcessorShard, workersCount) @@ -167,7 +167,7 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, ps: ps, stopCh: stopCh, cancel: cancel, - ppBase: ppBase, + ppNext: ppNext, shards: shards, @@ -182,7 +182,7 @@ type pipeStatsProcessor struct { ps *pipeStats stopCh <-chan struct{} cancel func() - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeStatsProcessorShard @@ -459,7 +459,7 @@ func (psp *pipeStatsProcessor) flush() error { } } - // Write per-group states to ppBase + // Write per-group states to ppNext byFields := psp.ps.byFields if len(byFields) == 0 && len(m) == 0 { // Special case - zero matching rows. @@ -519,7 +519,7 @@ func (psp *pipeStatsProcessor) flush() error { if valuesLen >= 1_000_000 { br.setResultColumns(rcs, rowsCount) rowsCount = 0 - psp.ppBase.writeBlock(0, &br) + psp.ppNext.writeBlock(0, &br) br.reset() for i := range rcs { rcs[i].resetValues() @@ -529,7 +529,7 @@ func (psp *pipeStatsProcessor) flush() error { } br.setResultColumns(rcs, rowsCount) - psp.ppBase.writeBlock(0, &br) + psp.ppNext.writeBlock(0, &br) return nil } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index b7108a11b..57938b2fe 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -13,7 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" ) -func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeTopkProcessorShard, workersCount) @@ -31,7 +31,7 @@ func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{} ps: ps, stopCh: stopCh, cancel: cancel, - ppBase: ppBase, + ppNext: ppNext, shards: shards, @@ -46,7 +46,7 @@ type pipeTopkProcessor struct { ps *pipeSort stopCh <-chan struct{} cancel func() - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeTopkProcessorShard @@ -464,7 +464,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo } } if !areEqualColumns { - // send the current block to ppBase and construct a block with new set of columns + // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] @@ -508,10 +508,10 @@ func (wctx *pipeTopkWriteContext) flush() { wctx.valuesLen = 0 - // Flush rcs to ppBase + // Flush rcs to ppNext br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.ptp.ppBase.writeBlock(0, br) + wctx.ptp.ppNext.writeBlock(0, br) br.reset() for i := range rcs { rcs[i].resetValues() diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index bac5475cb..c261aea05 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -63,7 +63,7 @@ func (pu *pipeUniq) initFilterInValues(cache map[string][]string, getFieldValues return pu, nil } -func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { +func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor { maxStateSize := int64(float64(memory.Allowed()) * 0.2) shards := make([]pipeUniqProcessorShard, workersCount) @@ -81,7 +81,7 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c pu: pu, stopCh: stopCh, cancel: cancel, - ppBase: ppBase, + ppNext: ppNext, shards: shards, @@ -96,7 +96,7 @@ type pipeUniqProcessor struct { pu *pipeUniq stopCh <-chan struct{} cancel func() - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeUniqProcessorShard @@ -430,7 +430,7 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { } } if !areEqualColumns { - // send the current block to ppBase and construct a block with new set of columns + // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] @@ -458,10 +458,10 @@ func (wctx *pipeUniqWriteContext) flush() { wctx.valuesLen = 0 - // Flush rcs to ppBase + // Flush rcs to ppNext br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.pup.ppBase.writeBlock(0, br) + wctx.pup.ppNext.writeBlock(0, br) br.reset() for i := range rcs { rcs[i].resetValues() diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index 1f41a3936..b7861e03e 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -96,12 +96,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) { }) } -func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor, +func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppNext pipeProcessor, fromField string, fieldPrefix string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter) *pipeUnpackProcessor { return &pipeUnpackProcessor{ unpackFunc: unpackFunc, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipeUnpackProcessorShard, workersCount), @@ -115,7 +115,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack type pipeUnpackProcessor struct { unpackFunc func(uctx *fieldsUnpackerContext, s string) - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeUnpackProcessorShard @@ -147,7 +147,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pup.shards[workerID] - shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, pup.skipEmptyResults, br) + shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br) shard.uctx.init(workerID, pup.fieldPrefix) bm := &shard.bm @@ -156,7 +156,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { if pup.iff != nil { pup.iff.f.applyToBlockResult(br, bm) if bm.isZero() { - pup.ppBase.writeBlock(workerID, br) + pup.ppNext.writeBlock(workerID, br) return } } @@ -204,7 +204,7 @@ func (pup *pipeUnpackProcessor) flush() error { type pipeUnpackWriteContext struct { workerID uint - ppBase pipeProcessor + ppNext pipeProcessor keepOriginalFields bool skipEmptyResults bool @@ -223,7 +223,7 @@ type pipeUnpackWriteContext struct { func (wctx *pipeUnpackWriteContext) reset() { wctx.workerID = 0 - wctx.ppBase = nil + wctx.ppNext = nil wctx.keepOriginalFields = false wctx.brSrc = nil @@ -239,11 +239,11 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.valuesLen = 0 } -func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) { +func (wctx *pipeUnpackWriteContext) init(workerID uint, ppNext pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) { wctx.reset() wctx.workerID = workerID - wctx.ppBase = ppBase + wctx.ppNext = ppNext wctx.keepOriginalFields = keepOriginalFields wctx.skipEmptyResults = skipEmptyResults @@ -265,7 +265,7 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) { } } if !areEqualColumns { - // send the current block to ppBase and construct a block with new set of columns + // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] @@ -310,11 +310,11 @@ func (wctx *pipeUnpackWriteContext) flush() { wctx.valuesLen = 0 - // Flush rcs to ppBase + // Flush rcs to ppNext br := &wctx.br br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 - wctx.ppBase.writeBlock(wctx.workerID, br) + wctx.ppNext.writeBlock(wctx.workerID, br) br.reset() for i := range rcs { rcs[i].resetValues() diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go index d2e4c5100..47eb18326 100644 --- a/lib/logstorage/pipe_unpack_json.go +++ b/lib/logstorage/pipe_unpack_json.go @@ -74,7 +74,7 @@ func (pu *pipeUnpackJSON) initFilterInValues(cache map[string][]string, getField return &puNew, nil } -func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { unpackJSON := func(uctx *fieldsUnpackerContext, s string) { if len(s) == 0 || s[0] != '{' { // This isn't a JSON object @@ -109,7 +109,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, } PutJSONParser(p) } - return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) + return newPipeUnpackProcessor(workersCount, unpackJSON, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) } func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go index 889565734..87b1e3575 100644 --- a/lib/logstorage/pipe_unpack_json_test.go +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -1,10 +1,6 @@ package logstorage import ( - "math/rand" - "slices" - "strings" - "sync" "testing" ) @@ -318,221 +314,6 @@ func TestPipeUnpackJSON(t *testing.T) { }) } -func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { - t.Helper() - - lex := newLexer(pipeStr) - p, err := parsePipe(lex) - if err != nil { - t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err) - } - - workersCount := 5 - stopCh := make(chan struct{}) - cancel := func() {} - ppTest := newTestPipeProcessor() - pp := p.newPipeProcessor(workersCount, stopCh, cancel, ppTest) - - brw := newTestBlockResultWriter(workersCount, pp) - for _, row := range rows { - brw.writeRow(row) - } - brw.flush() - pp.flush() - - ppTest.expectRows(t, rowsExpected) -} - -func newTestBlockResultWriter(workersCount int, ppBase pipeProcessor) *testBlockResultWriter { - return &testBlockResultWriter{ - workersCount: workersCount, - ppBase: ppBase, - } -} - -type testBlockResultWriter struct { - workersCount int - ppBase pipeProcessor - rcs []resultColumn - br blockResult - - rowsCount int -} - -func (brw *testBlockResultWriter) writeRow(row []Field) { - if !brw.areSameFields(row) { - brw.flush() - - brw.rcs = brw.rcs[:0] - for _, field := range row { - brw.rcs = appendResultColumnWithName(brw.rcs, field.Name) - } - } - - for i, field := range row { - brw.rcs[i].addValue(field.Value) - } - brw.rowsCount++ - if rand.Intn(5) == 0 { - brw.flush() - } -} - -func (brw *testBlockResultWriter) areSameFields(row []Field) bool { - if len(brw.rcs) != len(row) { - return false - } - for i, rc := range brw.rcs { - if rc.name != row[i].Name { - return false - } - } - return true -} - -func (brw *testBlockResultWriter) flush() { - brw.br.setResultColumns(brw.rcs, brw.rowsCount) - brw.rowsCount = 0 - workerID := rand.Intn(brw.workersCount) - brw.ppBase.writeBlock(uint(workerID), &brw.br) - brw.br.reset() - for i := range brw.rcs { - brw.rcs[i].resetValues() - } -} - -func newTestPipeProcessor() *testPipeProcessor { - return &testPipeProcessor{} -} - -type testPipeProcessor struct { - resultRowsLock sync.Mutex - resultRows [][]Field -} - -func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) { - cs := br.getColumns() - var columnValues [][]string - for _, c := range cs { - values := c.getValues(br) - columnValues = append(columnValues, values) - } - - for i := range br.timestamps { - row := make([]Field, len(columnValues)) - for j, values := range columnValues { - r := &row[j] - r.Name = strings.Clone(cs[j].name) - r.Value = strings.Clone(values[i]) - } - pp.resultRowsLock.Lock() - pp.resultRows = append(pp.resultRows, row) - pp.resultRowsLock.Unlock() - } -} - -func (pp *testPipeProcessor) flush() error { - return nil -} - -func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) { - t.Helper() - - if len(pp.resultRows) != len(expectedRows) { - t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s", - len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows)) - } - - sortTestRows(pp.resultRows) - sortTestRows(expectedRows) - - for i, resultRow := range pp.resultRows { - expectedRow := expectedRows[i] - if len(resultRow) != len(expectedRow) { - t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s", - i, len(resultRow), len(expectedRow), rowToString(resultRow), rowToString(expectedRow)) - } - for j, resultField := range resultRow { - expectedField := expectedRow[j] - if resultField.Name != expectedField.Name { - t.Fatalf("unexpected field name at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", - i, resultField.Name, expectedField.Name, rowToString(resultRow), rowToString(expectedRow)) - } - if resultField.Value != expectedField.Value { - t.Fatalf("unexpected value for field %q at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", - resultField.Name, i, resultField.Value, expectedField.Value, rowToString(resultRow), rowToString(expectedRow)) - } - } - } -} - -func sortTestRows(rows [][]Field) { - for _, row := range rows { - sortTestFields(row) - } - slices.SortFunc(rows, func(a, b []Field) int { - reverse := false - if len(a) > len(b) { - reverse = true - a, b = b, a - } - for i, fA := range a { - fB := b[i] - result := cmpTestFields(fA, fB) - if result == 0 { - continue - } - if reverse { - result = -result - } - return result - } - if len(a) == len(b) { - return 0 - } - if reverse { - return 1 - } - return -1 - }) -} - -func sortTestFields(fields []Field) { - slices.SortFunc(fields, cmpTestFields) -} - -func cmpTestFields(a, b Field) int { - if a.Name == b.Name { - if a.Value == b.Value { - return 0 - } - if a.Value < b.Value { - return -1 - } - return 1 - } - if a.Name < b.Name { - return -1 - } - return 1 -} - -func rowsToString(rows [][]Field) string { - a := make([]string, len(rows)) - for i, row := range rows { - a[i] = rowToString(row) - } - return strings.Join(a, "\n") -} - -func rowToString(row []Field) string { - a := make([]string, len(row)) - for i, f := range row { - a[i] = f.String() - } - return "{" + strings.Join(a, ",") + "}" -} - func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { t.Helper() diff --git a/lib/logstorage/pipe_unpack_logfmt.go b/lib/logstorage/pipe_unpack_logfmt.go index 5d69786e3..3eee8a231 100644 --- a/lib/logstorage/pipe_unpack_logfmt.go +++ b/lib/logstorage/pipe_unpack_logfmt.go @@ -72,7 +72,7 @@ func (pu *pipeUnpackLogfmt) initFilterInValues(cache map[string][]string, getFie return &puNew, nil } -func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) { p := getLogfmtParser() @@ -100,7 +100,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{} putLogfmtParser(p) } - return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) + return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) } diff --git a/lib/logstorage/pipe_unroll.go b/lib/logstorage/pipe_unroll.go index 0e803d264..0de54fd47 100644 --- a/lib/logstorage/pipe_unroll.go +++ b/lib/logstorage/pipe_unroll.go @@ -74,10 +74,10 @@ func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) } } -func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { +func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeUnrollProcessor{ pu: pu, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipeUnrollProcessorShard, workersCount), } @@ -85,7 +85,7 @@ func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu type pipeUnrollProcessor struct { pu *pipeUnroll - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeUnrollProcessorShard } @@ -116,7 +116,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { pu := pup.pu shard := &pup.shards[workerID] - shard.wctx.init(workerID, pup.ppBase, false, false, br) + shard.wctx.init(workerID, pup.ppNext, false, false, br) bm := &shard.bm bm.init(len(br.timestamps)) @@ -124,7 +124,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) { if iff := pu.iff; iff != nil { iff.f.applyToBlockResult(br, bm) if bm.isZero() { - pup.ppBase.writeBlock(workerID, br) + pup.ppNext.writeBlock(workerID, br) return } } diff --git a/lib/logstorage/pipe_update.go b/lib/logstorage/pipe_update.go index 490d0ab5b..718c2e37b 100644 --- a/lib/logstorage/pipe_update.go +++ b/lib/logstorage/pipe_update.go @@ -16,14 +16,14 @@ func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, fie } } -func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppBase pipeProcessor, field string, iff *ifFilter) pipeProcessor { +func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppNext pipeProcessor, field string, iff *ifFilter) pipeProcessor { return &pipeUpdateProcessor{ updateFunc: updateFunc, field: field, iff: iff, - ppBase: ppBase, + ppNext: ppNext, shards: make([]pipeUpdateProcessorShard, workersCount), } @@ -35,7 +35,7 @@ type pipeUpdateProcessor struct { field string iff *ifFilter - ppBase pipeProcessor + ppNext pipeProcessor shards []pipeUpdateProcessorShard } @@ -67,7 +67,7 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) { if iff := pup.iff; iff != nil { iff.f.applyToBlockResult(br, bm) if bm.isZero() { - pup.ppBase.writeBlock(workerID, br) + pup.ppNext.writeBlock(workerID, br) return } } @@ -92,7 +92,7 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) { } br.addResultColumn(&shard.rc) - pup.ppBase.writeBlock(workerID, br) + pup.ppNext.writeBlock(workerID, br) shard.rc.reset() shard.a.reset() diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go new file mode 100644 index 000000000..7a6c6f7d2 --- /dev/null +++ b/lib/logstorage/pipe_utils_test.go @@ -0,0 +1,224 @@ +package logstorage + +import ( + "math/rand" + "slices" + "strings" + "sync" + "testing" +) + +func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + + lex := newLexer(pipeStr) + p, err := parsePipe(lex) + if err != nil { + t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err) + } + + workersCount := 5 + stopCh := make(chan struct{}) + cancel := func() {} + ppTest := newTestPipeProcessor() + pp := p.newPipeProcessor(workersCount, stopCh, cancel, ppTest) + + brw := newTestBlockResultWriter(workersCount, pp) + for _, row := range rows { + brw.writeRow(row) + } + brw.flush() + pp.flush() + + ppTest.expectRows(t, rowsExpected) +} + +func newTestBlockResultWriter(workersCount int, ppNext pipeProcessor) *testBlockResultWriter { + return &testBlockResultWriter{ + workersCount: workersCount, + ppNext: ppNext, + } +} + +type testBlockResultWriter struct { + workersCount int + ppNext pipeProcessor + rcs []resultColumn + br blockResult + + rowsCount int +} + +func (brw *testBlockResultWriter) writeRow(row []Field) { + if !brw.areSameFields(row) { + brw.flush() + + brw.rcs = brw.rcs[:0] + for _, field := range row { + brw.rcs = appendResultColumnWithName(brw.rcs, field.Name) + } + } + + for i, field := range row { + brw.rcs[i].addValue(field.Value) + } + brw.rowsCount++ + if rand.Intn(5) == 0 { + brw.flush() + } +} + +func (brw *testBlockResultWriter) areSameFields(row []Field) bool { + if len(brw.rcs) != len(row) { + return false + } + for i, rc := range brw.rcs { + if rc.name != row[i].Name { + return false + } + } + return true +} + +func (brw *testBlockResultWriter) flush() { + brw.br.setResultColumns(brw.rcs, brw.rowsCount) + brw.rowsCount = 0 + workerID := rand.Intn(brw.workersCount) + brw.ppNext.writeBlock(uint(workerID), &brw.br) + brw.br.reset() + for i := range brw.rcs { + brw.rcs[i].resetValues() + } +} + +func newTestPipeProcessor() *testPipeProcessor { + return &testPipeProcessor{} +} + +type testPipeProcessor struct { + resultRowsLock sync.Mutex + resultRows [][]Field +} + +func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) { + cs := br.getColumns() + var columnValues [][]string + for _, c := range cs { + values := c.getValues(br) + columnValues = append(columnValues, values) + } + + for i := range br.timestamps { + row := make([]Field, len(columnValues)) + for j, values := range columnValues { + r := &row[j] + r.Name = strings.Clone(cs[j].name) + r.Value = strings.Clone(values[i]) + } + pp.resultRowsLock.Lock() + pp.resultRows = append(pp.resultRows, row) + pp.resultRowsLock.Unlock() + } +} + +func (pp *testPipeProcessor) flush() error { + return nil +} + +func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) { + t.Helper() + + if len(pp.resultRows) != len(expectedRows) { + t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s", + len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows)) + } + + sortTestRows(pp.resultRows) + sortTestRows(expectedRows) + + for i, resultRow := range pp.resultRows { + expectedRow := expectedRows[i] + if len(resultRow) != len(expectedRow) { + t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s", + i, len(resultRow), len(expectedRow), rowToString(resultRow), rowToString(expectedRow)) + } + for j, resultField := range resultRow { + expectedField := expectedRow[j] + if resultField.Name != expectedField.Name { + t.Fatalf("unexpected field name at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", + i, resultField.Name, expectedField.Name, rowToString(resultRow), rowToString(expectedRow)) + } + if resultField.Value != expectedField.Value { + t.Fatalf("unexpected value for field %q at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s", + resultField.Name, i, resultField.Value, expectedField.Value, rowToString(resultRow), rowToString(expectedRow)) + } + } + } +} + +func sortTestRows(rows [][]Field) { + for _, row := range rows { + sortTestFields(row) + } + slices.SortFunc(rows, func(a, b []Field) int { + reverse := false + if len(a) > len(b) { + reverse = true + a, b = b, a + } + for i, fA := range a { + fB := b[i] + result := cmpTestFields(fA, fB) + if result == 0 { + continue + } + if reverse { + result = -result + } + return result + } + if len(a) == len(b) { + return 0 + } + if reverse { + return 1 + } + return -1 + }) +} + +func sortTestFields(fields []Field) { + slices.SortFunc(fields, cmpTestFields) +} + +func cmpTestFields(a, b Field) int { + if a.Name == b.Name { + if a.Value == b.Value { + return 0 + } + if a.Value < b.Value { + return -1 + } + return 1 + } + if a.Name < b.Name { + return -1 + } + return 1 +} + +func rowsToString(rows [][]Field) string { + a := make([]string, len(rows)) + for i, row := range rows { + a[i] = rowToString(row) + } + return strings.Join(a, "\n") +} + +func rowToString(row []Field) string { + a := make([]string, len(row)) + for i, f := range row { + a[i] = f.String() + } + return "{" + strings.Join(a, ",") + "}" +}