mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
ad0cc2d55e
commit
9af6c63774
4 changed files with 22 additions and 20 deletions
|
@ -4,6 +4,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
"strconv"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// pipeFormat processes '| format ...' pipe.
|
// pipeFormat processes '| format ...' pipe.
|
||||||
|
@ -169,8 +171,8 @@ func (pfp *pipeFormatProcessor) flush() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string {
|
func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string {
|
||||||
bb := bbPool.Get()
|
b := shard.a.b
|
||||||
b := bb.B
|
bLen := len(b)
|
||||||
for _, step := range pf.steps {
|
for _, step := range pf.steps {
|
||||||
b = append(b, step.prefix...)
|
b = append(b, step.prefix...)
|
||||||
if step.field != "" {
|
if step.field != "" {
|
||||||
|
@ -183,11 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
bb.B = b
|
shard.a.b = b
|
||||||
|
|
||||||
v := shard.a.copyBytesToString(b)
|
return bytesutil.ToUnsafeString(b[bLen:])
|
||||||
bbPool.Put(bb)
|
|
||||||
return v
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
|
func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
|
||||||
|
|
|
@ -3,6 +3,8 @@ package logstorage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// pipeReplace processes '| replace ...' pipe.
|
// pipeReplace processes '| replace ...' pipe.
|
||||||
|
@ -59,11 +61,9 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal
|
||||||
|
|
||||||
func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
updateFunc := func(a *arena, v string) string {
|
updateFunc := func(a *arena, v string) string {
|
||||||
bb := bbPool.Get()
|
bLen := len(a.b)
|
||||||
bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit)
|
a.b = appendReplace(a.b, v, pr.oldSubstr, pr.newSubstr, pr.limit)
|
||||||
result := a.copyBytesToString(bb.B)
|
return bytesutil.ToUnsafeString(a.b[bLen:])
|
||||||
bbPool.Put(bb)
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
|
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
|
||||||
|
|
|
@ -3,6 +3,8 @@ package logstorage
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// pipeReplaceRegexp processes '| replace_regexp ...' pipe.
|
// pipeReplaceRegexp processes '| replace_regexp ...' pipe.
|
||||||
|
@ -59,11 +61,9 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi
|
||||||
|
|
||||||
func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||||
updateFunc := func(a *arena, v string) string {
|
updateFunc := func(a *arena, v string) string {
|
||||||
bb := bbPool.Get()
|
bLen := len(a.b)
|
||||||
bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit)
|
a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit)
|
||||||
result := a.copyBytesToString(bb.B)
|
return bytesutil.ToUnsafeString(a.b[bLen:])
|
||||||
bbPool.Put(bb)
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
|
return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
|
||||||
|
|
|
@ -80,10 +80,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
|
||||||
nameCopy := ""
|
nameCopy := ""
|
||||||
fieldPrefix := uctx.fieldPrefix
|
fieldPrefix := uctx.fieldPrefix
|
||||||
if fieldPrefix != "" {
|
if fieldPrefix != "" {
|
||||||
nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
|
b := uctx.a.b
|
||||||
copy(nameBuf, fieldPrefix)
|
bLen := len(b)
|
||||||
copy(nameBuf[len(fieldPrefix):], name)
|
b = append(b, fieldPrefix...)
|
||||||
nameCopy = bytesutil.ToUnsafeString(nameBuf)
|
b = append(b, name...)
|
||||||
|
uctx.a.b = b
|
||||||
|
nameCopy = bytesutil.ToUnsafeString(b[bLen:])
|
||||||
} else {
|
} else {
|
||||||
nameCopy = uctx.a.copyString(name)
|
nameCopy = uctx.a.copyString(name)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue