From 9af6c63774d141ef87a55e0872c3250b597a9134 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 27 May 2024 00:58:41 +0200 Subject: [PATCH] wip --- lib/logstorage/pipe_format.go | 12 ++++++------ lib/logstorage/pipe_replace.go | 10 +++++----- lib/logstorage/pipe_replace_regexp.go | 10 +++++----- lib/logstorage/pipe_unpack.go | 10 ++++++---- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go index de580110e..7146f99ed 100644 --- a/lib/logstorage/pipe_format.go +++ b/lib/logstorage/pipe_format.go @@ -4,6 +4,8 @@ import ( "fmt" "strconv" "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeFormat processes '| format ...' pipe. @@ -169,8 +171,8 @@ func (pfp *pipeFormatProcessor) flush() error { } func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string { - bb := bbPool.Get() - b := bb.B + b := shard.a.b + bLen := len(b) for _, step := range pf.steps { b = append(b, step.prefix...) if step.field != "" { @@ -183,11 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult } } } - bb.B = b + shard.a.b = b - v := shard.a.copyBytesToString(b) - bbPool.Put(bb) - return v + return bytesutil.ToUnsafeString(b[bLen:]) } func parsePipeFormat(lex *lexer) (*pipeFormat, error) { diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go index 13d66c5a8..10e36169d 100644 --- a/lib/logstorage/pipe_replace.go +++ b/lib/logstorage/pipe_replace.go @@ -3,6 +3,8 @@ package logstorage import ( "fmt" "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeReplace processes '| replace ...' pipe. @@ -59,11 +61,9 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { - bb := bbPool.Get() - bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit) - result := a.copyBytesToString(bb.B) - bbPool.Put(bb) - return result + bLen := len(a.b) + a.b = appendReplace(a.b, v, pr.oldSubstr, pr.newSubstr, pr.limit) + return bytesutil.ToUnsafeString(a.b[bLen:]) } return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) diff --git a/lib/logstorage/pipe_replace_regexp.go b/lib/logstorage/pipe_replace_regexp.go index 24aa5418c..43c951f58 100644 --- a/lib/logstorage/pipe_replace_regexp.go +++ b/lib/logstorage/pipe_replace_regexp.go @@ -3,6 +3,8 @@ package logstorage import ( "fmt" "regexp" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // pipeReplaceRegexp processes '| replace_regexp ...' pipe. @@ -59,11 +61,9 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { updateFunc := func(a *arena, v string) string { - bb := bbPool.Get() - bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit) - result := a.copyBytesToString(bb.B) - bbPool.Put(bb) - return result + bLen := len(a.b) + a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit) + return bytesutil.ToUnsafeString(a.b[bLen:]) } return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff) diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index b7861e03e..fde04b369 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -80,10 +80,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) { nameCopy := "" fieldPrefix := uctx.fieldPrefix if fieldPrefix != "" { - nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name)) - copy(nameBuf, fieldPrefix) - copy(nameBuf[len(fieldPrefix):], name) - nameCopy = bytesutil.ToUnsafeString(nameBuf) + b := uctx.a.b + bLen := len(b) + b = append(b, fieldPrefix...) + b = append(b, name...) + uctx.a.b = b + nameCopy = bytesutil.ToUnsafeString(b[bLen:]) } else { nameCopy = uctx.a.copyString(name) }