This commit is contained in:
Aliaksandr Valialkin 2024-05-20 03:52:16 +02:00
parent e4928ee1ff
commit a50045efde
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
8 changed files with 550 additions and 86 deletions

View file

@ -33,10 +33,15 @@ type JSONParser struct {
}
func (p *JSONParser) reset() {
p.resetNobuf()
p.buf = p.buf[:0]
}
func (p *JSONParser) resetNobuf() {
clear(p.Fields)
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
p.prefixBuf = p.prefixBuf[:0]
}
@ -90,6 +95,8 @@ func (p *JSONParser) parseLogMessage(msg, prefix string, resetBuf bool) error {
}
if resetBuf {
p.reset()
} else {
p.resetNobuf()
}
p.prefixBuf = append(p.prefixBuf[:0], prefix...)
p.Fields, p.buf, p.prefixBuf = appendLogFields(p.Fields, p.buf, p.prefixBuf, v)

View file

@ -1008,6 +1008,12 @@ func TestParseQuerySuccess(t *testing.T) {
f(`* | unpack_json from x`, `* | unpack_json from x`)
f(`* | unpack_json from x result_prefix y`, `* | unpack_json from x result_prefix y`)
// unpack_logfmt pipe
f(`* | unpack_logfmt`, `* | unpack_logfmt`)
f(`* | unpack_logfmt result_prefix y`, `* | unpack_logfmt result_prefix y`)
f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`)
f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`)
// multiple different pipes
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
@ -1411,11 +1417,18 @@ func TestParseQueryFailure(t *testing.T) {
f(`foo | extract from x "<abc>" de`)
// invalid unpack_json pipe
f(`foo | extract_json bar`)
f(`foo | extract_json from`)
f(`foo | extract_json result_prefix`)
f(`foo | extract_json result_prefix x from y`)
f(`foo | extract_json from x result_prefix`)
f(`foo | unpack_json bar`)
f(`foo | unpack_json from`)
f(`foo | unpack_json result_prefix`)
f(`foo | unpack_json result_prefix x from y`)
f(`foo | unpack_json from x result_prefix`)
// invalid unpack_logfmt pipe
f(`foo | unpack_logfmt bar`)
f(`foo | unpack_logfmt from`)
f(`foo | unpack_logfmt result_prefix`)
f(`foo | unpack_logfmt result_prefix x from y`)
f(`foo | unpack_logfmt from x result_prefix`)
}
func TestQueryGetNeededColumns(t *testing.T) {
@ -1574,6 +1587,13 @@ func TestQueryGetNeededColumns(t *testing.T) {
f(`* | unpack_json from s1 | rm f1`, `*`, `f1`)
f(`* | unpack_json from s1 | rm f1,s1`, `*`, `f1`)
f(`* | unpack_logfmt`, `*`, ``)
f(`* | unpack_logfmt from s1`, `*`, ``)
f(`* | unpack_logfmt from s1 | fields f1`, `f1,s1`, ``)
f(`* | unpack_logfmt from s1 | fields s1,f1`, `f1,s1`, ``)
f(`* | unpack_logfmt from s1 | rm f1`, `*`, `f1`)
f(`* | unpack_logfmt from s1 | rm f1,s1`, `*`, `f1`)
f(`* | rm f1, f2`, `*`, `f1,f2`)
f(`* | rm f1, f2 | mv f2 f3`, `*`, `f1,f2,f3`)
f(`* | rm f1, f2 | cp f2 f3`, `*`, `f1,f2,f3`)

View file

@ -157,6 +157,12 @@ func parsePipe(lex *lexer) (pipe, error) {
return nil, fmt.Errorf("cannot parse 'unpack_json' pipe: %w", err)
}
return pu, nil
case lex.isKeyword("unpack_logfmt"):
pu, err := parsePipeUnpackLogfmt(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
}
return pu, nil
default:
return nil, fmt.Errorf("unexpected pipe %q", lex.token)
}

View file

@ -252,8 +252,8 @@ func (ef *extractFormat) apply(s string) {
nextPrefix = steps[i+1].prefix
}
us, nOffset, ok := tryUnquoteString(s)
if ok {
us, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
// Matched quoted string
matches[i] = us
s = s[nOffset:]
@ -279,22 +279,22 @@ func (ef *extractFormat) apply(s string) {
}
}
func tryUnquoteString(s string) (string, int, bool) {
func tryUnquoteString(s string) (string, int) {
if len(s) == 0 {
return s, 0, false
return s, -1
}
if s[0] != '"' && s[0] != '`' {
return s, 0, false
return s, -1
}
qp, err := strconv.QuotedPrefix(s)
if err != nil {
return s, 0, false
return s, -1
}
us, err := strconv.Unquote(qp)
if err != nil {
return s, 0, false
return s, -1
}
return us, len(qp), true
return us, len(qp)
}
func parseExtractFormatSteps(s string) ([]extractFormatStep, error) {

View file

@ -60,72 +60,9 @@ type pipeUnpackJSONProcessorShard struct {
}
type pipeUnpackJSONProcessorShardNopad struct {
jsonParser JSONParser
p JSONParser
rcs []resultColumn
br blockResult
valuesLen int
}
func (shard *pipeUnpackJSONProcessorShard) writeRow(ppBase pipeProcessor, br *blockResult, cs []*blockResultColumn, rowIdx int, extraFields []Field) {
rcs := shard.rcs
areEqualColumns := len(rcs) == len(cs)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(cs)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
shard.flush(ppBase)
rcs = shard.rcs[:0]
for _, c := range cs {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
shard.rcs = rcs
}
for i, c := range cs {
v := c.getValueAtRow(br, rowIdx)
rcs[i].addValue(v)
shard.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(cs)+i].addValue(v)
shard.valuesLen += len(v)
}
if shard.valuesLen >= 1_000_000 {
shard.flush(ppBase)
}
}
func (shard *pipeUnpackJSONProcessorShard) flush(ppBase pipeProcessor) {
rcs := shard.rcs
shard.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &shard.br
br.setResultColumns(rcs)
ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
wctx pipeUnpackWriteContext
}
func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []Field {
@ -133,11 +70,11 @@ func (shard *pipeUnpackJSONProcessorShard) parseJSON(v, resultPrefix string) []F
// This isn't a JSON object
return nil
}
if err := shard.jsonParser.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil {
if err := shard.p.ParseLogMessageNoResetBuf(v, resultPrefix); err != nil {
// Cannot parse v
return nil
}
return shard.jsonParser.Fields
return shard.p.Fields
}
func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
@ -147,14 +84,15 @@ func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
resultPrefix := pup.pu.resultPrefix
shard := &pup.shards[workerID]
wctx := &shard.wctx
wctx.init(br, pup.ppBase)
cs := br.getColumns()
c := br.getColumnByName(pup.pu.fromField)
if c.isConst {
v := c.valuesEncoded[0]
extraFields := shard.parseJSON(v, resultPrefix)
for rowIdx := range br.timestamps {
shard.writeRow(pup.ppBase, br, cs, rowIdx, extraFields)
wctx.writeRow(rowIdx, extraFields)
}
} else {
values := c.getValues(br)
@ -163,12 +101,12 @@ func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
if i == 0 || values[i-1] != v {
extraFields = shard.parseJSON(v, resultPrefix)
}
shard.writeRow(pup.ppBase, br, cs, i, extraFields)
wctx.writeRow(i, extraFields)
}
}
shard.flush(pup.ppBase)
shard.jsonParser.reset()
wctx.flush()
shard.p.reset()
}
func (pup *pipeUnpackJSONProcessor) flush() error {

View file

@ -133,6 +133,35 @@ func TestPipeUnpackJSON(t *testing.T) {
},
})
// multiple rows with distinct number of fields with result_prefix
f("unpack_json from x result_prefix qwe_", [][]Field{
{
{"x", `{"foo":"bar","baz":"xyz"}`},
{"y", `abc`},
},
{
{"y", `abc`},
},
{
{"z", `foobar`},
{"x", `{"z":["bar",123]}`},
},
}, [][]Field{
{
{"x", `{"foo":"bar","baz":"xyz"}`},
{"y", "abc"},
{"qwe_foo", "bar"},
{"qwe_baz", "xyz"},
},
{
{"y", `abc`},
},
{
{"z", `foobar`},
{"x", `{"z":["bar",123]}`},
{"qwe_z", `["bar",123]`},
},
})
}
func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) {

View file

@ -0,0 +1,289 @@
package logstorage
import (
"fmt"
"strings"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
// pipeUnpackLogfmt processes '| unpack_logfmt ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_logfmt-pipe
type pipeUnpackLogfmt struct {
fromField string
resultPrefix string
}
func (pu *pipeUnpackLogfmt) String() string {
s := "unpack_logfmt"
if !isMsgFieldName(pu.fromField) {
s += " from " + quoteTokenIfNeeded(pu.fromField)
}
if pu.resultPrefix != "" {
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
}
return s
}
func (pu *pipeUnpackLogfmt) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
unneededFields.remove(pu.fromField)
} else {
neededFields.add(pu.fromField)
}
}
func (pu *pipeUnpackLogfmt) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor {
shards := make([]pipeUnpackLogfmtProcessorShard, workersCount)
pup := &pipeUnpackLogfmtProcessor{
pu: pu,
ppBase: ppBase,
shards: shards,
}
return pup
}
type pipeUnpackLogfmtProcessor struct {
pu *pipeUnpackLogfmt
ppBase pipeProcessor
shards []pipeUnpackLogfmtProcessorShard
}
type pipeUnpackLogfmtProcessorShard struct {
pipeUnpackLogfmtProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnpackLogfmtProcessorShardNopad{})%128]byte
}
type pipeUnpackLogfmtProcessorShardNopad struct {
p logfmtParser
wctx pipeUnpackWriteContext
}
func (pup *pipeUnpackLogfmtProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
resultPrefix := pup.pu.resultPrefix
shard := &pup.shards[workerID]
wctx := &shard.wctx
wctx.init(br, pup.ppBase)
c := br.getColumnByName(pup.pu.fromField)
if c.isConst {
v := c.valuesEncoded[0]
extraFields := shard.p.parse(v, resultPrefix)
for rowIdx := range br.timestamps {
wctx.writeRow(rowIdx, extraFields)
}
} else {
values := c.getValues(br)
var extraFields []Field
for i, v := range values {
if i == 0 || values[i-1] != v {
extraFields = shard.p.parse(v, resultPrefix)
}
wctx.writeRow(i, extraFields)
}
}
wctx.flush()
shard.p.reset()
}
func (pup *pipeUnpackLogfmtProcessor) flush() error {
return nil
}
func parsePipeUnpackLogfmt(lex *lexer) (*pipeUnpackLogfmt, error) {
if !lex.isKeyword("unpack_logfmt") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_logfmt")
}
lex.nextToken()
fromField := "_msg"
if lex.isKeyword("from") {
lex.nextToken()
f, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
}
fromField = f
}
resultPrefix := ""
if lex.isKeyword("result_prefix") {
lex.nextToken()
p, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err)
}
resultPrefix = p
}
pu := &pipeUnpackLogfmt{
fromField: fromField,
resultPrefix: resultPrefix,
}
return pu, nil
}
type pipeUnpackWriteContext struct {
brSrc *blockResult
csSrc []*blockResultColumn
ppBase pipeProcessor
rcs []resultColumn
br blockResult
valuesLen int
}
func (wctx *pipeUnpackWriteContext) init(brSrc *blockResult, ppBase pipeProcessor) {
wctx.brSrc = brSrc
wctx.csSrc = brSrc.getColumns()
wctx.ppBase = ppBase
}
func (wctx *pipeUnpackWriteContext) writeRow(rowIdx int, extraFields []Field) {
csSrc := wctx.csSrc
rcs := wctx.rcs
areEqualColumns := len(rcs) == len(csSrc)+len(extraFields)
if areEqualColumns {
for i, f := range extraFields {
if rcs[len(csSrc)+i].name != f.Name {
areEqualColumns = false
break
}
}
}
if !areEqualColumns {
// send the current block to bbBase and construct a block with new set of columns
wctx.flush()
rcs = wctx.rcs[:0]
for _, c := range csSrc {
rcs = appendResultColumnWithName(rcs, c.name)
}
for _, f := range extraFields {
rcs = appendResultColumnWithName(rcs, f.Name)
}
wctx.rcs = rcs
}
brSrc := wctx.brSrc
for i, c := range csSrc {
v := c.getValueAtRow(brSrc, rowIdx)
rcs[i].addValue(v)
wctx.valuesLen += len(v)
}
for i, f := range extraFields {
v := f.Value
rcs[len(csSrc)+i].addValue(v)
wctx.valuesLen += len(v)
}
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeUnpackWriteContext) flush() {
rcs := wctx.rcs
wctx.valuesLen = 0
if len(rcs) == 0 {
return
}
// Flush rcs to ppBase
br := &wctx.br
br.setResultColumns(rcs)
wctx.ppBase.writeBlock(0, br)
br.reset()
for i := range rcs {
rcs[i].resetValues()
}
}
type logfmtParser struct {
Fields []Field
buf []byte
}
func (p *logfmtParser) reset() {
clear(p.Fields)
p.Fields = p.Fields[:0]
p.buf = p.buf[:0]
}
func (p *logfmtParser) parse(s, resultPrefix string) []Field {
clear(p.Fields)
p.Fields = p.Fields[:0]
for {
// Search for field name
n := strings.IndexByte(s, '=')
if n < 0 {
// field name couldn't be read
return p.Fields
}
name := strings.TrimSpace(s[:n])
s = s[n+1:]
if len(s) == 0 {
p.addField(name, "", resultPrefix)
return p.Fields
}
// Search for field value
value, nOffset := tryUnquoteString(s)
if nOffset >= 0 {
p.addField(name, value, resultPrefix)
s = s[nOffset:]
if len(s) == 0 {
return p.Fields
}
if s[0] != ' ' {
return p.Fields
}
s = s[1:]
} else {
n := strings.IndexByte(s, ' ')
if n < 0 {
p.addField(name, s, resultPrefix)
return p.Fields
}
p.addField(name, s[:n], resultPrefix)
s = s[n+1:]
}
}
}
func (p *logfmtParser) addField(name, value, resultPrefix string) {
if resultPrefix != "" {
buf := p.buf
bufLen := len(buf)
buf = append(buf, resultPrefix...)
buf = append(buf, name...)
p.buf = buf
name = bytesutil.ToUnsafeString(buf[bufLen:])
}
p.Fields = append(p.Fields, Field{
Name: name,
Value: value,
})
}

View file

@ -0,0 +1,175 @@
package logstorage
import (
"testing"
)
func TestPipeUnpackLogfmt(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
// single row, unpack from _msg
f("unpack_logfmt", [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
},
}, [][]Field{
{
{"_msg", `foo=bar baz="x y=z" a=b`},
{"foo", "bar"},
{"baz", "x y=z"},
{"a", "b"},
},
})
// single row, unpack from _msg into _msg
f("unpack_logfmt", [][]Field{
{
{"_msg", `_msg=bar`},
},
}, [][]Field{
{
{"_msg", "bar"},
},
})
// single row, unpack from missing field
f("unpack_logfmt from x", [][]Field{
{
{"_msg", `foo=bar`},
},
}, [][]Field{
{
{"_msg", `foo=bar`},
},
})
// single row, unpack from non-json field
f("unpack_logfmt from x", [][]Field{
{
{"x", `foobar`},
},
}, [][]Field{
{
{"x", `foobar`},
},
})
// single row, unpack from non-logfmt
f("unpack_logfmt from x", [][]Field{
{
{"x", `foobar`},
},
}, [][]Field{
{
{"x", `foobar`},
},
})
// unpack empty value
f("unpack_logfmt from x", [][]Field{
{
{"x", `foobar=`},
},
}, [][]Field{
{
{"x", `foobar=`},
{"foobar", ""},
},
})
f("unpack_logfmt from x", [][]Field{
{
{"x", `foo="" bar= baz=`},
},
}, [][]Field{
{
{"x", `foo="" bar= baz=`},
{"foo", ""},
{"bar", ""},
{"baz", ""},
},
})
// multiple rows with distinct number of fields
f("unpack_logfmt from x", [][]Field{
{
{"x", `foo=bar baz=xyz`},
{"y", `abc`},
},
{
{"y", `abc`},
},
{
{"z", `foobar`},
{"x", `z=bar`},
},
}, [][]Field{
{
{"x", `foo=bar baz=xyz`},
{"y", "abc"},
{"foo", "bar"},
{"baz", "xyz"},
},
{
{"y", `abc`},
},
{
{"z", `bar`},
{"x", `z=bar`},
},
})
// multiple rows with distinct number of fields, with result_prefix
f("unpack_logfmt from x result_prefix qwe_", [][]Field{
{
{"x", `foo=bar baz=xyz`},
{"y", `abc`},
},
{
{"y", `abc`},
},
{
{"z", `foobar`},
{"x", `z=bar`},
},
}, [][]Field{
{
{"x", `foo=bar baz=xyz`},
{"y", "abc"},
{"qwe_foo", "bar"},
{"qwe_baz", "xyz"},
},
{
{"y", `abc`},
},
{
{"z", `foobar`},
{"x", `z=bar`},
{"qwe_z", `bar`},
},
})
}
func TestPipeUnpackLogfmtUpdateNeededFields(t *testing.T) {
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper()
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
}
// all the needed fields
f("unpack_logfmt from x", "*", "", "*", "")
// all the needed fields, unneeded fields do not intersect with src
f("unpack_logfmt from x", "*", "f1,f2", "*", "f1,f2")
// all the needed fields, unneeded fields intersect with src
f("unpack_logfmt from x", "*", "f2,x", "*", "f2")
// needed fields do not intersect with src
f("unpack_logfmt from x", "f1,f2", "", "f1,f2,x", "")
// needed fields intersect with src
f("unpack_logfmt from x", "f2,x", "", "f2,x", "")
}