This commit is contained in:
Aliaksandr Valialkin 2024-05-25 20:13:01 +02:00
parent 41547740f6
commit 1416b5f813
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
25 changed files with 338 additions and 333 deletions

View file

@ -11,15 +11,15 @@ type pipe interface {
// updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input. // updateNeededFields must update neededFields and unneededFields with fields it needs and not needs at the input.
updateNeededFields(neededFields, unneededFields fieldsSet) updateNeededFields(neededFields, unneededFields fieldsSet)
// newPipeProcessor must return new pipeProcessor for the given ppBase. // newPipeProcessor must return new pipeProcessor, which writes data to the given ppNext.
// //
// workersCount is the number of goroutine workers, which will call writeBlock() method. // workersCount is the number of goroutine workers, which will call writeBlock() method.
// //
// If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds. // If stopCh is closed, the returned pipeProcessor must stop performing CPU-intensive tasks which take more than a few milliseconds.
// It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds. // It is OK to continue processing pipeProcessor calls if they take less than a few milliseconds.
// //
// The returned pipeProcessor may call cancel() at any time in order to notify worker goroutines to stop sending new data to pipeProcessor. // The returned pipeProcessor may call cancel() at any time in order to notify the caller to stop sending new data to it.
newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor
// optimize must optimize the pipe // optimize must optimize the pipe
optimize() optimize()
@ -50,7 +50,7 @@ type pipeProcessor interface {
// cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error. // cancel() may be called also when the pipeProcessor decides to stop accepting new data, even if there is no any error.
writeBlock(workerID uint, br *blockResult) writeBlock(workerID uint, br *blockResult)
// flush must flush all the data accumulated in the pipeProcessor to the base pipeProcessor. // flush must flush all the data accumulated in the pipeProcessor to the next pipeProcessor.
// //
// flush is called after all the worker goroutines are stopped. // flush is called after all the worker goroutines are stopped.
// //

View file

@ -62,16 +62,16 @@ func (pc *pipeCopy) initFilterInValues(cache map[string][]string, getFieldValues
return pc, nil return pc, nil
} }
func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pc *pipeCopy) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeCopyProcessor{ return &pipeCopyProcessor{
pc: pc, pc: pc,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeCopyProcessor struct { type pipeCopyProcessor struct {
pc *pipeCopy pc *pipeCopy
ppBase pipeProcessor ppNext pipeProcessor
} }
func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) { func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) {
@ -80,7 +80,7 @@ func (pcp *pipeCopyProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.copyColumns(pcp.pc.srcFields, pcp.pc.dstFields) br.copyColumns(pcp.pc.srcFields, pcp.pc.dstFields)
pcp.ppBase.writeBlock(workerID, br) pcp.ppNext.writeBlock(workerID, br)
} }
func (pcp *pipeCopyProcessor) flush() error { func (pcp *pipeCopyProcessor) flush() error {

View file

@ -44,16 +44,16 @@ func (pd *pipeDelete) initFilterInValues(cache map[string][]string, getFieldValu
return pd, nil return pd, nil
} }
func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pd *pipeDelete) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeDeleteProcessor{ return &pipeDeleteProcessor{
pd: pd, pd: pd,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeDeleteProcessor struct { type pipeDeleteProcessor struct {
pd *pipeDelete pd *pipeDelete
ppBase pipeProcessor ppNext pipeProcessor
} }
func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) { func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) {
@ -62,7 +62,7 @@ func (pdp *pipeDeleteProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.deleteColumns(pdp.pd.fields) br.deleteColumns(pdp.pd.fields)
pdp.ppBase.writeBlock(workerID, br) pdp.ppNext.writeBlock(workerID, br)
} }
func (pdp *pipeDeleteProcessor) flush() error { func (pdp *pipeDeleteProcessor) flush() error {

View file

@ -101,10 +101,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
} }
} }
func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeExtractProcessor{ return &pipeExtractProcessor{
pe: pe, pe: pe,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipeExtractProcessorShard, workersCount), shards: make([]pipeExtractProcessorShard, workersCount),
} }
@ -112,7 +112,7 @@ func (pe *pipeExtract) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
type pipeExtractProcessor struct { type pipeExtractProcessor struct {
pe *pipeExtract pe *pipeExtract
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeExtractProcessorShard shards []pipeExtractProcessorShard
} }
@ -149,7 +149,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
if iff := pe.iff; iff != nil { if iff := pe.iff; iff != nil {
iff.f.applyToBlockResult(br, bm) iff.f.applyToBlockResult(br, bm)
if bm.isZero() { if bm.isZero() {
pep.ppBase.writeBlock(workerID, br) pep.ppNext.writeBlock(workerID, br)
return return
} }
} }
@ -214,7 +214,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
for i := range rcs { for i := range rcs {
br.addResultColumn(&rcs[i]) br.addResultColumn(&rcs[i])
} }
pep.ppBase.writeBlock(workerID, br) pep.ppNext.writeBlock(workerID, br)
for i := range rcs { for i := range rcs {
rcs[i].reset() rcs[i].reset()

View file

@ -49,13 +49,13 @@ func (pf *pipeFieldNames) initFilterInValues(cache map[string][]string, getField
return pf, nil return pf, nil
} }
func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
shards := make([]pipeFieldNamesProcessorShard, workersCount) shards := make([]pipeFieldNamesProcessorShard, workersCount)
pfp := &pipeFieldNamesProcessor{ pfp := &pipeFieldNamesProcessor{
pf: pf, pf: pf,
stopCh: stopCh, stopCh: stopCh,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
} }
@ -65,7 +65,7 @@ func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struc
type pipeFieldNamesProcessor struct { type pipeFieldNamesProcessor struct {
pf *pipeFieldNames pf *pipeFieldNames
stopCh <-chan struct{} stopCh <-chan struct{}
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeFieldNamesProcessorShard shards []pipeFieldNamesProcessorShard
} }
@ -184,10 +184,10 @@ func (wctx *pipeFieldNamesWriteContext) flush() {
wctx.valuesLen = 0 wctx.valuesLen = 0
// Flush rcs to ppBase // Flush rcs to ppNext
br.setResultColumns(wctx.rcs[:], wctx.rowsCount) br.setResultColumns(wctx.rcs[:], wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.pfp.ppBase.writeBlock(0, br) wctx.pfp.ppNext.writeBlock(0, br)
br.reset() br.reset()
wctx.rcs[0].resetValues() wctx.rcs[0].resetValues()
wctx.rcs[1].resetValues() wctx.rcs[1].resetValues()

View file

@ -61,16 +61,16 @@ func (pf *pipeFields) initFilterInValues(cache map[string][]string, getFieldValu
return pf, nil return pf, nil
} }
func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pf *pipeFields) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeFieldsProcessor{ return &pipeFieldsProcessor{
pf: pf, pf: pf,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeFieldsProcessor struct { type pipeFieldsProcessor struct {
pf *pipeFields pf *pipeFields
ppBase pipeProcessor ppNext pipeProcessor
} }
func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) { func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
@ -81,7 +81,7 @@ func (pfp *pipeFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
if !pfp.pf.containsStar { if !pfp.pf.containsStar {
br.setColumns(pfp.pf.fields) br.setColumns(pfp.pf.fields)
} }
pfp.ppBase.writeBlock(workerID, br) pfp.ppNext.writeBlock(workerID, br)
} }
func (pfp *pipeFieldsProcessor) flush() error { func (pfp *pipeFieldsProcessor) flush() error {

View file

@ -47,12 +47,12 @@ func (pf *pipeFilter) initFilterInValues(cache map[string][]string, getFieldValu
return &pfNew, nil return &pfNew, nil
} }
func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
shards := make([]pipeFilterProcessorShard, workersCount) shards := make([]pipeFilterProcessorShard, workersCount)
pfp := &pipeFilterProcessor{ pfp := &pipeFilterProcessor{
pf: pf, pf: pf,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
} }
@ -61,7 +61,7 @@ func (pf *pipeFilter) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu
type pipeFilterProcessor struct { type pipeFilterProcessor struct {
pf *pipeFilter pf *pipeFilter
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeFilterProcessorShard shards []pipeFilterProcessorShard
} }
@ -90,8 +90,8 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) {
bm.setBits() bm.setBits()
pfp.pf.f.applyToBlockResult(br, bm) pfp.pf.f.applyToBlockResult(br, bm)
if bm.areAllBitsSet() { if bm.areAllBitsSet() {
// Fast path - the filter didn't filter out anything - send br to the base pipe as is. // Fast path - the filter didn't filter out anything - send br to the next pipe as is.
pfp.ppBase.writeBlock(workerID, br) pfp.ppNext.writeBlock(workerID, br)
return return
} }
if bm.isZero() { if bm.isZero() {
@ -99,9 +99,9 @@ func (pfp *pipeFilterProcessor) writeBlock(workerID uint, br *blockResult) {
return return
} }
// Slow path - copy the remaining rows from br to shard.br before sending them to base pipe. // Slow path - copy the remaining rows from br to shard.br before sending them to the next pipe.
shard.br.initFromFilterAllColumns(br, bm) shard.br.initFromFilterAllColumns(br, bm)
pfp.ppBase.writeBlock(workerID, &shard.br) pfp.ppNext.writeBlock(workerID, &shard.br)
} }
func (pfp *pipeFilterProcessor) flush() error { func (pfp *pipeFilterProcessor) flush() error {

View file

@ -90,10 +90,10 @@ func (pf *pipeFormat) initFilterInValues(cache map[string][]string, getFieldValu
return &pfNew, nil return &pfNew, nil
} }
func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeFormatProcessor{ return &pipeFormatProcessor{
pf: pf, pf: pf,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipeFormatProcessorShard, workersCount), shards: make([]pipeFormatProcessorShard, workersCount),
} }
@ -101,7 +101,7 @@ func (pf *pipeFormat) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu
type pipeFormatProcessor struct { type pipeFormatProcessor struct {
pf *pipeFormat pf *pipeFormat
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeFormatProcessorShard shards []pipeFormatProcessorShard
} }
@ -134,7 +134,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
if iff := pf.iff; iff != nil { if iff := pf.iff; iff != nil {
iff.f.applyToBlockResult(br, bm) iff.f.applyToBlockResult(br, bm)
if bm.isZero() { if bm.isZero() {
pfp.ppBase.writeBlock(workerID, br) pfp.ppNext.writeBlock(workerID, br)
return return
} }
} }
@ -158,7 +158,7 @@ func (pfp *pipeFormatProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.addResultColumn(&shard.rc) br.addResultColumn(&shard.rc)
pfp.ppBase.writeBlock(workerID, br) pfp.ppNext.writeBlock(workerID, br)
shard.a.reset() shard.a.reset()
shard.rc.reset() shard.rc.reset()

View file

@ -32,7 +32,7 @@ func (pl *pipeLimit) initFilterInValues(cache map[string][]string, getFieldValue
return pl, nil return pl, nil
} }
func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
if pl.limit == 0 { if pl.limit == 0 {
// Special case - notify the caller to stop writing data to the returned pipeLimitProcessor // Special case - notify the caller to stop writing data to the returned pipeLimitProcessor
cancel() cancel()
@ -40,14 +40,14 @@ func (pl *pipeLimit) newPipeProcessor(_ int, _ <-chan struct{}, cancel func(), p
return &pipeLimitProcessor{ return &pipeLimitProcessor{
pl: pl, pl: pl,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeLimitProcessor struct { type pipeLimitProcessor struct {
pl *pipeLimit pl *pipeLimit
cancel func() cancel func()
ppBase pipeProcessor ppNext pipeProcessor
rowsProcessed atomic.Uint64 rowsProcessed atomic.Uint64
} }
@ -59,8 +59,8 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps))) rowsProcessed := plp.rowsProcessed.Add(uint64(len(br.timestamps)))
if rowsProcessed <= plp.pl.limit { if rowsProcessed <= plp.pl.limit {
// Fast path - write all the rows to ppBase. // Fast path - write all the rows to ppNext.
plp.ppBase.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)
return return
} }
@ -74,7 +74,7 @@ func (plp *pipeLimitProcessor) writeBlock(workerID uint, br *blockResult) {
// Write remaining rows. // Write remaining rows.
keepRows := plp.pl.limit - rowsProcessed keepRows := plp.pl.limit - rowsProcessed
br.truncateRows(int(keepRows)) br.truncateRows(int(keepRows))
plp.ppBase.writeBlock(workerID, br) plp.ppNext.writeBlock(workerID, br)
// Notify the caller that it should stop passing more data to writeBlock(). // Notify the caller that it should stop passing more data to writeBlock().
plp.cancel() plp.cancel()

View file

@ -32,16 +32,16 @@ func (po *pipeOffset) initFilterInValues(cache map[string][]string, getFieldValu
return po, nil return po, nil
} }
func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (po *pipeOffset) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeOffsetProcessor{ return &pipeOffsetProcessor{
po: po, po: po,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeOffsetProcessor struct { type pipeOffsetProcessor struct {
po *pipeOffset po *pipeOffset
ppBase pipeProcessor ppNext pipeProcessor
rowsProcessed atomic.Uint64 rowsProcessed atomic.Uint64
} }
@ -58,13 +58,13 @@ func (pop *pipeOffsetProcessor) writeBlock(workerID uint, br *blockResult) {
rowsProcessed -= uint64(len(br.timestamps)) rowsProcessed -= uint64(len(br.timestamps))
if rowsProcessed >= pop.po.offset { if rowsProcessed >= pop.po.offset {
pop.ppBase.writeBlock(workerID, br) pop.ppNext.writeBlock(workerID, br)
return return
} }
rowsSkip := pop.po.offset - rowsProcessed rowsSkip := pop.po.offset - rowsProcessed
br.skipRows(int(rowsSkip)) br.skipRows(int(rowsSkip))
pop.ppBase.writeBlock(workerID, br) pop.ppNext.writeBlock(workerID, br)
} }
func (pop *pipeOffsetProcessor) flush() error { func (pop *pipeOffsetProcessor) flush() error {

View file

@ -46,10 +46,10 @@ func (pp *pipePackJSON) initFilterInValues(cache map[string][]string, getFieldVa
return pp, nil return pp, nil
} }
func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipePackJSONProcessor{ return &pipePackJSONProcessor{
pp: pp, pp: pp,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipePackJSONProcessorShard, workersCount), shards: make([]pipePackJSONProcessorShard, workersCount),
} }
@ -57,7 +57,7 @@ func (pp *pipePackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _
type pipePackJSONProcessor struct { type pipePackJSONProcessor struct {
pp *pipePackJSON pp *pipePackJSON
ppBase pipeProcessor ppNext pipeProcessor
shards []pipePackJSONProcessorShard shards []pipePackJSONProcessorShard
} }
@ -106,7 +106,7 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.addResultColumn(&shard.rc) br.addResultColumn(&shard.rc)
ppp.ppBase.writeBlock(workerID, br) ppp.ppNext.writeBlock(workerID, br)
shard.rc.reset() shard.rc.reset()
} }

View file

@ -66,16 +66,16 @@ func (pr *pipeRename) initFilterInValues(cache map[string][]string, getFieldValu
return pr, nil return pr, nil
} }
func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pr *pipeRename) newPipeProcessor(_ int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeRenameProcessor{ return &pipeRenameProcessor{
pr: pr, pr: pr,
ppBase: ppBase, ppNext: ppNext,
} }
} }
type pipeRenameProcessor struct { type pipeRenameProcessor struct {
pr *pipeRename pr *pipeRename
ppBase pipeProcessor ppNext pipeProcessor
} }
func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) { func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) {
@ -84,7 +84,7 @@ func (prp *pipeRenameProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.renameColumns(prp.pr.srcFields, prp.pr.dstFields) br.renameColumns(prp.pr.srcFields, prp.pr.dstFields)
prp.ppBase.writeBlock(workerID, br) prp.ppNext.writeBlock(workerID, br)
} }
func (prp *pipeRenameProcessor) flush() error { func (prp *pipeRenameProcessor) flush() error {

View file

@ -57,7 +57,7 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal
return &peNew, nil return &peNew, nil
} }
func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
updateFunc := func(a *arena, v string) string { updateFunc := func(a *arena, v string) string {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit)
@ -66,7 +66,7 @@ func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ f
return result return result
} }
return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
} }
func parsePipeReplace(lex *lexer) (*pipeReplace, error) { func parsePipeReplace(lex *lexer) (*pipeReplace, error) {

View file

@ -57,7 +57,7 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi
return &peNew, nil return &peNew, nil
} }
func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
updateFunc := func(a *arena, v string) string { updateFunc := func(a *arena, v string) string {
bb := bbPool.Get() bb := bbPool.Get()
bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit) bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit)
@ -66,7 +66,7 @@ func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{
return result return result
} }
return newPipeUpdateProcessor(workersCount, updateFunc, ppBase, pr.field, pr.iff) return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
} }

View file

@ -79,14 +79,14 @@ func (ps *pipeSort) initFilterInValues(cache map[string][]string, getFieldValues
return ps, nil return ps, nil
} }
func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (ps *pipeSort) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
if ps.limit > 0 { if ps.limit > 0 {
return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppBase) return newPipeTopkProcessor(ps, workersCount, stopCh, cancel, ppNext)
} }
return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppBase) return newPipeSortProcessor(ps, workersCount, stopCh, cancel, ppNext)
} }
func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2) maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeSortProcessorShard, workersCount) shards := make([]pipeSortProcessorShard, workersCount)
@ -104,7 +104,7 @@ func newPipeSortProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}
ps: ps, ps: ps,
stopCh: stopCh, stopCh: stopCh,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
@ -119,7 +119,7 @@ type pipeSortProcessor struct {
ps *pipeSort ps *pipeSort
stopCh <-chan struct{} stopCh <-chan struct{}
cancel func() cancel func()
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeSortProcessorShard shards []pipeSortProcessorShard
@ -534,7 +534,7 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) {
} }
} }
if !areEqualColumns { if !areEqualColumns {
// send the current block to ppBase and construct a block with new set of columns // send the current block to ppNext and construct a block with new set of columns
wctx.flush() wctx.flush()
rcs = wctx.rcs[:0] rcs = wctx.rcs[:0]
@ -573,10 +573,10 @@ func (wctx *pipeSortWriteContext) flush() {
wctx.valuesLen = 0 wctx.valuesLen = 0
// Flush rcs to ppBase // Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount) br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.psp.ppBase.writeBlock(0, br) wctx.psp.ppNext.writeBlock(0, br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()

View file

@ -148,7 +148,7 @@ func (ps *pipeStats) initFilterInValues(cache map[string][]string, getFieldValue
const stateSizeBudgetChunk = 1 << 20 const stateSizeBudgetChunk = 1 << 20
func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.3) maxStateSize := int64(float64(memory.Allowed()) * 0.3)
shards := make([]pipeStatsProcessorShard, workersCount) shards := make([]pipeStatsProcessorShard, workersCount)
@ -167,7 +167,7 @@ func (ps *pipeStats) newPipeProcessor(workersCount int, stopCh <-chan struct{},
ps: ps, ps: ps,
stopCh: stopCh, stopCh: stopCh,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
@ -182,7 +182,7 @@ type pipeStatsProcessor struct {
ps *pipeStats ps *pipeStats
stopCh <-chan struct{} stopCh <-chan struct{}
cancel func() cancel func()
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeStatsProcessorShard shards []pipeStatsProcessorShard
@ -459,7 +459,7 @@ func (psp *pipeStatsProcessor) flush() error {
} }
} }
// Write per-group states to ppBase // Write per-group states to ppNext
byFields := psp.ps.byFields byFields := psp.ps.byFields
if len(byFields) == 0 && len(m) == 0 { if len(byFields) == 0 && len(m) == 0 {
// Special case - zero matching rows. // Special case - zero matching rows.
@ -519,7 +519,7 @@ func (psp *pipeStatsProcessor) flush() error {
if valuesLen >= 1_000_000 { if valuesLen >= 1_000_000 {
br.setResultColumns(rcs, rowsCount) br.setResultColumns(rcs, rowsCount)
rowsCount = 0 rowsCount = 0
psp.ppBase.writeBlock(0, &br) psp.ppNext.writeBlock(0, &br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()
@ -529,7 +529,7 @@ func (psp *pipeStatsProcessor) flush() error {
} }
br.setResultColumns(rcs, rowsCount) br.setResultColumns(rcs, rowsCount)
psp.ppBase.writeBlock(0, &br) psp.ppNext.writeBlock(0, &br)
return nil return nil
} }

View file

@ -13,7 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
) )
func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2) maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeTopkProcessorShard, workersCount) shards := make([]pipeTopkProcessorShard, workersCount)
@ -31,7 +31,7 @@ func newPipeTopkProcessor(ps *pipeSort, workersCount int, stopCh <-chan struct{}
ps: ps, ps: ps,
stopCh: stopCh, stopCh: stopCh,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
@ -46,7 +46,7 @@ type pipeTopkProcessor struct {
ps *pipeSort ps *pipeSort
stopCh <-chan struct{} stopCh <-chan struct{}
cancel func() cancel func()
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeTopkProcessorShard shards []pipeTopkProcessorShard
@ -464,7 +464,7 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo
} }
} }
if !areEqualColumns { if !areEqualColumns {
// send the current block to ppBase and construct a block with new set of columns // send the current block to ppNext and construct a block with new set of columns
wctx.flush() wctx.flush()
rcs = wctx.rcs[:0] rcs = wctx.rcs[:0]
@ -508,10 +508,10 @@ func (wctx *pipeTopkWriteContext) flush() {
wctx.valuesLen = 0 wctx.valuesLen = 0
// Flush rcs to ppBase // Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount) br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.ptp.ppBase.writeBlock(0, br) wctx.ptp.ppNext.writeBlock(0, br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()

View file

@ -63,7 +63,7 @@ func (pu *pipeUniq) initFilterInValues(cache map[string][]string, getFieldValues
return pu, nil return pu, nil
} }
func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
maxStateSize := int64(float64(memory.Allowed()) * 0.2) maxStateSize := int64(float64(memory.Allowed()) * 0.2)
shards := make([]pipeUniqProcessorShard, workersCount) shards := make([]pipeUniqProcessorShard, workersCount)
@ -81,7 +81,7 @@ func (pu *pipeUniq) newPipeProcessor(workersCount int, stopCh <-chan struct{}, c
pu: pu, pu: pu,
stopCh: stopCh, stopCh: stopCh,
cancel: cancel, cancel: cancel,
ppBase: ppBase, ppNext: ppNext,
shards: shards, shards: shards,
@ -96,7 +96,7 @@ type pipeUniqProcessor struct {
pu *pipeUniq pu *pipeUniq
stopCh <-chan struct{} stopCh <-chan struct{}
cancel func() cancel func()
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeUniqProcessorShard shards []pipeUniqProcessorShard
@ -430,7 +430,7 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) {
} }
} }
if !areEqualColumns { if !areEqualColumns {
// send the current block to ppBase and construct a block with new set of columns // send the current block to ppNext and construct a block with new set of columns
wctx.flush() wctx.flush()
rcs = wctx.rcs[:0] rcs = wctx.rcs[:0]
@ -458,10 +458,10 @@ func (wctx *pipeUniqWriteContext) flush() {
wctx.valuesLen = 0 wctx.valuesLen = 0
// Flush rcs to ppBase // Flush rcs to ppNext
br.setResultColumns(rcs, wctx.rowsCount) br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.pup.ppBase.writeBlock(0, br) wctx.pup.ppNext.writeBlock(0, br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()

View file

@ -96,12 +96,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
}) })
} }
func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppBase pipeProcessor, func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpackerContext, s string), ppNext pipeProcessor,
fromField string, fieldPrefix string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter) *pipeUnpackProcessor { fromField string, fieldPrefix string, keepOriginalFields, skipEmptyResults bool, iff *ifFilter) *pipeUnpackProcessor {
return &pipeUnpackProcessor{ return &pipeUnpackProcessor{
unpackFunc: unpackFunc, unpackFunc: unpackFunc,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipeUnpackProcessorShard, workersCount), shards: make([]pipeUnpackProcessorShard, workersCount),
@ -115,7 +115,7 @@ func newPipeUnpackProcessor(workersCount int, unpackFunc func(uctx *fieldsUnpack
type pipeUnpackProcessor struct { type pipeUnpackProcessor struct {
unpackFunc func(uctx *fieldsUnpackerContext, s string) unpackFunc func(uctx *fieldsUnpackerContext, s string)
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeUnpackProcessorShard shards []pipeUnpackProcessorShard
@ -147,7 +147,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
} }
shard := &pup.shards[workerID] shard := &pup.shards[workerID]
shard.wctx.init(workerID, pup.ppBase, pup.keepOriginalFields, pup.skipEmptyResults, br) shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br)
shard.uctx.init(workerID, pup.fieldPrefix) shard.uctx.init(workerID, pup.fieldPrefix)
bm := &shard.bm bm := &shard.bm
@ -156,7 +156,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
if pup.iff != nil { if pup.iff != nil {
pup.iff.f.applyToBlockResult(br, bm) pup.iff.f.applyToBlockResult(br, bm)
if bm.isZero() { if bm.isZero() {
pup.ppBase.writeBlock(workerID, br) pup.ppNext.writeBlock(workerID, br)
return return
} }
} }
@ -204,7 +204,7 @@ func (pup *pipeUnpackProcessor) flush() error {
type pipeUnpackWriteContext struct { type pipeUnpackWriteContext struct {
workerID uint workerID uint
ppBase pipeProcessor ppNext pipeProcessor
keepOriginalFields bool keepOriginalFields bool
skipEmptyResults bool skipEmptyResults bool
@ -223,7 +223,7 @@ type pipeUnpackWriteContext struct {
func (wctx *pipeUnpackWriteContext) reset() { func (wctx *pipeUnpackWriteContext) reset() {
wctx.workerID = 0 wctx.workerID = 0
wctx.ppBase = nil wctx.ppNext = nil
wctx.keepOriginalFields = false wctx.keepOriginalFields = false
wctx.brSrc = nil wctx.brSrc = nil
@ -239,11 +239,11 @@ func (wctx *pipeUnpackWriteContext) reset() {
wctx.valuesLen = 0 wctx.valuesLen = 0
} }
func (wctx *pipeUnpackWriteContext) init(workerID uint, ppBase pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) { func (wctx *pipeUnpackWriteContext) init(workerID uint, ppNext pipeProcessor, keepOriginalFields, skipEmptyResults bool, brSrc *blockResult) {
wctx.reset() wctx.reset()
wctx.workerID = workerID wctx.workerID = workerID
wctx.ppBase = ppBase wctx.ppNext = ppNext
wctx.keepOriginalFields = keepOriginalFields wctx.keepOriginalFields = keepOriginalFields
wctx.skipEmptyResults = skipEmptyResults wctx.skipEmptyResults = skipEmptyResults
@ -265,7 +265,7 @@ func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
} }
} }
if !areEqualColumns { if !areEqualColumns {
// send the current block to ppBase and construct a block with new set of columns // send the current block to ppNext and construct a block with new set of columns
wctx.flush() wctx.flush()
rcs = wctx.rcs[:0] rcs = wctx.rcs[:0]
@ -310,11 +310,11 @@ func (wctx *pipeUnpackWriteContext) flush() {
wctx.valuesLen = 0 wctx.valuesLen = 0
// Flush rcs to ppBase // Flush rcs to ppNext
br := &wctx.br br := &wctx.br
br.setResultColumns(rcs, wctx.rowsCount) br.setResultColumns(rcs, wctx.rowsCount)
wctx.rowsCount = 0 wctx.rowsCount = 0
wctx.ppBase.writeBlock(wctx.workerID, br) wctx.ppNext.writeBlock(wctx.workerID, br)
br.reset() br.reset()
for i := range rcs { for i := range rcs {
rcs[i].resetValues() rcs[i].resetValues()

View file

@ -74,7 +74,7 @@ func (pu *pipeUnpackJSON) initFilterInValues(cache map[string][]string, getField
return &puNew, nil return &puNew, nil
} }
func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackJSON := func(uctx *fieldsUnpackerContext, s string) { unpackJSON := func(uctx *fieldsUnpackerContext, s string) {
if len(s) == 0 || s[0] != '{' { if len(s) == 0 || s[0] != '{' {
// This isn't a JSON object // This isn't a JSON object
@ -109,7 +109,7 @@ func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, _ <-chan struct{},
} }
PutJSONParser(p) PutJSONParser(p)
} }
return newPipeUnpackProcessor(workersCount, unpackJSON, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) return newPipeUnpackProcessor(workersCount, unpackJSON, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff)
} }
func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) {

View file

@ -1,10 +1,6 @@
package logstorage package logstorage
import ( import (
"math/rand"
"slices"
"strings"
"sync"
"testing" "testing"
) )
@ -318,221 +314,6 @@ func TestPipeUnpackJSON(t *testing.T) {
}) })
} }
func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err)
}
workersCount := 5
stopCh := make(chan struct{})
cancel := func() {}
ppTest := newTestPipeProcessor()
pp := p.newPipeProcessor(workersCount, stopCh, cancel, ppTest)
brw := newTestBlockResultWriter(workersCount, pp)
for _, row := range rows {
brw.writeRow(row)
}
brw.flush()
pp.flush()
ppTest.expectRows(t, rowsExpected)
}
func newTestBlockResultWriter(workersCount int, ppBase pipeProcessor) *testBlockResultWriter {
return &testBlockResultWriter{
workersCount: workersCount,
ppBase: ppBase,
}
}
type testBlockResultWriter struct {
workersCount int
ppBase pipeProcessor
rcs []resultColumn
br blockResult
rowsCount int
}
func (brw *testBlockResultWriter) writeRow(row []Field) {
if !brw.areSameFields(row) {
brw.flush()
brw.rcs = brw.rcs[:0]
for _, field := range row {
brw.rcs = appendResultColumnWithName(brw.rcs, field.Name)
}
}
for i, field := range row {
brw.rcs[i].addValue(field.Value)
}
brw.rowsCount++
if rand.Intn(5) == 0 {
brw.flush()
}
}
func (brw *testBlockResultWriter) areSameFields(row []Field) bool {
if len(brw.rcs) != len(row) {
return false
}
for i, rc := range brw.rcs {
if rc.name != row[i].Name {
return false
}
}
return true
}
func (brw *testBlockResultWriter) flush() {
brw.br.setResultColumns(brw.rcs, brw.rowsCount)
brw.rowsCount = 0
workerID := rand.Intn(brw.workersCount)
brw.ppBase.writeBlock(uint(workerID), &brw.br)
brw.br.reset()
for i := range brw.rcs {
brw.rcs[i].resetValues()
}
}
func newTestPipeProcessor() *testPipeProcessor {
return &testPipeProcessor{}
}
type testPipeProcessor struct {
resultRowsLock sync.Mutex
resultRows [][]Field
}
func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) {
cs := br.getColumns()
var columnValues [][]string
for _, c := range cs {
values := c.getValues(br)
columnValues = append(columnValues, values)
}
for i := range br.timestamps {
row := make([]Field, len(columnValues))
for j, values := range columnValues {
r := &row[j]
r.Name = strings.Clone(cs[j].name)
r.Value = strings.Clone(values[i])
}
pp.resultRowsLock.Lock()
pp.resultRows = append(pp.resultRows, row)
pp.resultRowsLock.Unlock()
}
}
func (pp *testPipeProcessor) flush() error {
return nil
}
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
t.Helper()
if len(pp.resultRows) != len(expectedRows) {
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows))
}
sortTestRows(pp.resultRows)
sortTestRows(expectedRows)
for i, resultRow := range pp.resultRows {
expectedRow := expectedRows[i]
if len(resultRow) != len(expectedRow) {
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",
i, len(resultRow), len(expectedRow), rowToString(resultRow), rowToString(expectedRow))
}
for j, resultField := range resultRow {
expectedField := expectedRow[j]
if resultField.Name != expectedField.Name {
t.Fatalf("unexpected field name at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s",
i, resultField.Name, expectedField.Name, rowToString(resultRow), rowToString(expectedRow))
}
if resultField.Value != expectedField.Value {
t.Fatalf("unexpected value for field %q at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s",
resultField.Name, i, resultField.Value, expectedField.Value, rowToString(resultRow), rowToString(expectedRow))
}
}
}
}
func sortTestRows(rows [][]Field) {
for _, row := range rows {
sortTestFields(row)
}
slices.SortFunc(rows, func(a, b []Field) int {
reverse := false
if len(a) > len(b) {
reverse = true
a, b = b, a
}
for i, fA := range a {
fB := b[i]
result := cmpTestFields(fA, fB)
if result == 0 {
continue
}
if reverse {
result = -result
}
return result
}
if len(a) == len(b) {
return 0
}
if reverse {
return 1
}
return -1
})
}
func sortTestFields(fields []Field) {
slices.SortFunc(fields, cmpTestFields)
}
func cmpTestFields(a, b Field) int {
if a.Name == b.Name {
if a.Value == b.Value {
return 0
}
if a.Value < b.Value {
return -1
}
return 1
}
if a.Name < b.Name {
return -1
}
return 1
}
func rowsToString(rows [][]Field) string {
a := make([]string, len(rows))
for i, row := range rows {
a[i] = rowToString(row)
}
return strings.Join(a, "\n")
}
func rowToString(row []Field) string {
a := make([]string, len(row))
for i, f := range row {
a[i] = f.String()
}
return "{" + strings.Join(a, ",") + "}"
}
func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper() t.Helper()

View file

@ -72,7 +72,7 @@ func (pu *pipeUnpackLogfmt) initFilterInValues(cache map[string][]string, getFie
return &puNew, nil return &puNew, nil
} }
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) { unpackLogfmt := func(uctx *fieldsUnpackerContext, s string) {
p := getLogfmtParser() p := getLogfmtParser()
@ -100,7 +100,7 @@ func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}
putLogfmtParser(p) putLogfmtParser(p)
} }
return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppBase, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff) return newPipeUnpackProcessor(workersCount, unpackLogfmt, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, pu.skipEmptyResults, pu.iff)
} }

View file

@ -74,10 +74,10 @@ func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet)
} }
} }
func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeUnrollProcessor{ return &pipeUnrollProcessor{
pu: pu, pu: pu,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipeUnrollProcessorShard, workersCount), shards: make([]pipeUnrollProcessorShard, workersCount),
} }
@ -85,7 +85,7 @@ func (pu *pipeUnroll) newPipeProcessor(workersCount int, _ <-chan struct{}, _ fu
type pipeUnrollProcessor struct { type pipeUnrollProcessor struct {
pu *pipeUnroll pu *pipeUnroll
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeUnrollProcessorShard shards []pipeUnrollProcessorShard
} }
@ -116,7 +116,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
pu := pup.pu pu := pup.pu
shard := &pup.shards[workerID] shard := &pup.shards[workerID]
shard.wctx.init(workerID, pup.ppBase, false, false, br) shard.wctx.init(workerID, pup.ppNext, false, false, br)
bm := &shard.bm bm := &shard.bm
bm.init(len(br.timestamps)) bm.init(len(br.timestamps))
@ -124,7 +124,7 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
if iff := pu.iff; iff != nil { if iff := pu.iff; iff != nil {
iff.f.applyToBlockResult(br, bm) iff.f.applyToBlockResult(br, bm)
if bm.isZero() { if bm.isZero() {
pup.ppBase.writeBlock(workerID, br) pup.ppNext.writeBlock(workerID, br)
return return
} }
} }

View file

@ -16,14 +16,14 @@ func updateNeededFieldsForUpdatePipe(neededFields, unneededFields fieldsSet, fie
} }
} }
func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppBase pipeProcessor, field string, iff *ifFilter) pipeProcessor { func newPipeUpdateProcessor(workersCount int, updateFunc func(a *arena, v string) string, ppNext pipeProcessor, field string, iff *ifFilter) pipeProcessor {
return &pipeUpdateProcessor{ return &pipeUpdateProcessor{
updateFunc: updateFunc, updateFunc: updateFunc,
field: field, field: field,
iff: iff, iff: iff,
ppBase: ppBase, ppNext: ppNext,
shards: make([]pipeUpdateProcessorShard, workersCount), shards: make([]pipeUpdateProcessorShard, workersCount),
} }
@ -35,7 +35,7 @@ type pipeUpdateProcessor struct {
field string field string
iff *ifFilter iff *ifFilter
ppBase pipeProcessor ppNext pipeProcessor
shards []pipeUpdateProcessorShard shards []pipeUpdateProcessorShard
} }
@ -67,7 +67,7 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) {
if iff := pup.iff; iff != nil { if iff := pup.iff; iff != nil {
iff.f.applyToBlockResult(br, bm) iff.f.applyToBlockResult(br, bm)
if bm.isZero() { if bm.isZero() {
pup.ppBase.writeBlock(workerID, br) pup.ppNext.writeBlock(workerID, br)
return return
} }
} }
@ -92,7 +92,7 @@ func (pup *pipeUpdateProcessor) writeBlock(workerID uint, br *blockResult) {
} }
br.addResultColumn(&shard.rc) br.addResultColumn(&shard.rc)
pup.ppBase.writeBlock(workerID, br) pup.ppNext.writeBlock(workerID, br)
shard.rc.reset() shard.rc.reset()
shard.a.reset() shard.a.reset()

View file

@ -0,0 +1,224 @@
package logstorage
import (
"math/rand"
"slices"
"strings"
"sync"
"testing"
)
func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
lex := newLexer(pipeStr)
p, err := parsePipe(lex)
if err != nil {
t.Fatalf("unexpected error when parsing %q: %s", pipeStr, err)
}
workersCount := 5
stopCh := make(chan struct{})
cancel := func() {}
ppTest := newTestPipeProcessor()
pp := p.newPipeProcessor(workersCount, stopCh, cancel, ppTest)
brw := newTestBlockResultWriter(workersCount, pp)
for _, row := range rows {
brw.writeRow(row)
}
brw.flush()
pp.flush()
ppTest.expectRows(t, rowsExpected)
}
func newTestBlockResultWriter(workersCount int, ppNext pipeProcessor) *testBlockResultWriter {
return &testBlockResultWriter{
workersCount: workersCount,
ppNext: ppNext,
}
}
type testBlockResultWriter struct {
workersCount int
ppNext pipeProcessor
rcs []resultColumn
br blockResult
rowsCount int
}
func (brw *testBlockResultWriter) writeRow(row []Field) {
if !brw.areSameFields(row) {
brw.flush()
brw.rcs = brw.rcs[:0]
for _, field := range row {
brw.rcs = appendResultColumnWithName(brw.rcs, field.Name)
}
}
for i, field := range row {
brw.rcs[i].addValue(field.Value)
}
brw.rowsCount++
if rand.Intn(5) == 0 {
brw.flush()
}
}
func (brw *testBlockResultWriter) areSameFields(row []Field) bool {
if len(brw.rcs) != len(row) {
return false
}
for i, rc := range brw.rcs {
if rc.name != row[i].Name {
return false
}
}
return true
}
func (brw *testBlockResultWriter) flush() {
brw.br.setResultColumns(brw.rcs, brw.rowsCount)
brw.rowsCount = 0
workerID := rand.Intn(brw.workersCount)
brw.ppNext.writeBlock(uint(workerID), &brw.br)
brw.br.reset()
for i := range brw.rcs {
brw.rcs[i].resetValues()
}
}
func newTestPipeProcessor() *testPipeProcessor {
return &testPipeProcessor{}
}
type testPipeProcessor struct {
resultRowsLock sync.Mutex
resultRows [][]Field
}
func (pp *testPipeProcessor) writeBlock(_ uint, br *blockResult) {
cs := br.getColumns()
var columnValues [][]string
for _, c := range cs {
values := c.getValues(br)
columnValues = append(columnValues, values)
}
for i := range br.timestamps {
row := make([]Field, len(columnValues))
for j, values := range columnValues {
r := &row[j]
r.Name = strings.Clone(cs[j].name)
r.Value = strings.Clone(values[i])
}
pp.resultRowsLock.Lock()
pp.resultRows = append(pp.resultRows, row)
pp.resultRowsLock.Unlock()
}
}
func (pp *testPipeProcessor) flush() error {
return nil
}
func (pp *testPipeProcessor) expectRows(t *testing.T, expectedRows [][]Field) {
t.Helper()
if len(pp.resultRows) != len(expectedRows) {
t.Fatalf("unexpected number of rows; got %d; want %d\nrows got\n%s\nrows expected\n%s",
len(pp.resultRows), len(expectedRows), rowsToString(pp.resultRows), rowsToString(expectedRows))
}
sortTestRows(pp.resultRows)
sortTestRows(expectedRows)
for i, resultRow := range pp.resultRows {
expectedRow := expectedRows[i]
if len(resultRow) != len(expectedRow) {
t.Fatalf("unexpected number of fields at row #%d; got %d; want %d\nrow got\n%s\nrow expected\n%s",
i, len(resultRow), len(expectedRow), rowToString(resultRow), rowToString(expectedRow))
}
for j, resultField := range resultRow {
expectedField := expectedRow[j]
if resultField.Name != expectedField.Name {
t.Fatalf("unexpected field name at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s",
i, resultField.Name, expectedField.Name, rowToString(resultRow), rowToString(expectedRow))
}
if resultField.Value != expectedField.Value {
t.Fatalf("unexpected value for field %q at row #%d; got %q; want %q\nrow got\n%s\nrow expected\n%s",
resultField.Name, i, resultField.Value, expectedField.Value, rowToString(resultRow), rowToString(expectedRow))
}
}
}
}
func sortTestRows(rows [][]Field) {
for _, row := range rows {
sortTestFields(row)
}
slices.SortFunc(rows, func(a, b []Field) int {
reverse := false
if len(a) > len(b) {
reverse = true
a, b = b, a
}
for i, fA := range a {
fB := b[i]
result := cmpTestFields(fA, fB)
if result == 0 {
continue
}
if reverse {
result = -result
}
return result
}
if len(a) == len(b) {
return 0
}
if reverse {
return 1
}
return -1
})
}
func sortTestFields(fields []Field) {
slices.SortFunc(fields, cmpTestFields)
}
func cmpTestFields(a, b Field) int {
if a.Name == b.Name {
if a.Value == b.Value {
return 0
}
if a.Value < b.Value {
return -1
}
return 1
}
if a.Name < b.Name {
return -1
}
return 1
}
func rowsToString(rows [][]Field) string {
a := make([]string, len(rows))
for i, row := range rows {
a[i] = rowToString(row)
}
return strings.Join(a, "\n")
}
func rowToString(row []Field) string {
a := make([]string, len(row))
for i, f := range row {
a[i] = f.String()
}
return "{" + strings.Join(a, ",") + "}"
}