diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 89e7b8fd1..552cffd47 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1810,11 +1810,19 @@ type resultColumn struct { values []string } -func (rc *resultColumn) resetKeepName() { +func (rc *resultColumn) resetValues() { clear(rc.values) rc.values = rc.values[:0] } +func appendResultColumnWithName(dst []resultColumn, name string) []resultColumn { + dst = slicesutil.SetLength(dst, len(dst)+1) + rc := &dst[len(dst)-1] + rc.resetValues() + rc.name = name + return dst +} + // addValue adds the given values v to rc. func (rc *resultColumn) addValue(v string) { rc.values = append(rc.values, v) diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 541688b6b..67850946f 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -31,14 +31,14 @@ func (pe *pipeExtract) String() string { func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet) { if neededFields.contains("*") { + unneededFieldsOrig := unneededFields.clone() needFromField := false for _, step := range pe.steps { if step.field != "" { - if !unneededFields.contains(step.field) { + if !unneededFieldsOrig.contains(step.field) { needFromField = true - } else { - unneededFields.remove(step.field) } + unneededFields.add(step.field) } } if needFromField { @@ -49,11 +49,9 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet } else { needFromField := false for _, step := range pe.steps { - if step.field != "" { - if neededFields.contains(step.field) { - needFromField = true - neededFields.remove(step.field) - } + if step.field != "" && neededFields.contains(step.field) { + needFromField = true + neededFields.remove(step.field) } } if needFromField { @@ -145,7 +143,7 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { pep.ppBase.writeBlock(workerID, br) for i := range rcs { - rcs[i].resetKeepName() + rcs[i].resetValues() } } diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go index 8d0d05d1a..ad6f0f3ea 100644 --- a/lib/logstorage/pipe_extract_test.go +++ b/lib/logstorage/pipe_extract_test.go @@ -196,19 +196,19 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) { } // all the needed fields - f("extract from x ''", "*", "", "*", "") + f("extract from x ''", "*", "", "*", "foo") // all the needed fields, unneeded fields do not intersect with fromField and output fields - f("extract from x ''", "*", "f1,f2", "*", "f1,f2") + f("extract from x ''", "*", "f1,f2", "*", "f1,f2,foo") // all the needed fields, unneeded fields intersect with fromField - f("extract from x ''", "*", "f2,x", "*", "f2") + f("extract from x ''", "*", "f2,x", "*", "f2,foo") // all the needed fields, unneeded fields intersect with output fields - f("extract from x 'x'", "*", "f2,foo", "*", "f2") + f("extract from x 'x'", "*", "f2,foo", "*", "bar,f2,foo") // all the needed fields, unneeded fields intersect with all the output fields - f("extract from x 'x'", "*", "f2,foo,bar", "*", "f2,x") + f("extract from x 'x'", "*", "f2,foo,bar", "*", "bar,f2,foo,x") // needed fields do not intersect with fromField and output fields f("extract from x 'x'", "f1,f2", "", "f1,f2", "") diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 8209c77ca..9ac1a206d 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -143,7 +143,7 @@ func (wctx *pipeFieldNamesWriteContext) flush() { br.setResultColumns(wctx.rcs[:1]) wctx.pfp.ppBase.writeBlock(0, br) br.reset() - wctx.rcs[0].resetKeepName() + wctx.rcs[0].resetValues() } func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) { diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index d92b1924a..a29da7aa4 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -521,14 +521,10 @@ func (wctx *pipeSortWriteContext) writeNextRow(shard *pipeSortProcessorShard) { rcs = wctx.rcs[:0] for _, bf := range byFields { - rcs = append(rcs, resultColumn{ - name: bf.name, - }) + rcs = appendResultColumnWithName(rcs, bf.name) } for _, c := range b.otherColumns { - rcs = append(rcs, resultColumn{ - name: c.name, - }) + rcs = appendResultColumnWithName(rcs, c.name) } wctx.rcs = rcs } @@ -567,7 +563,7 @@ func (wctx *pipeSortWriteContext) flush() { wctx.psp.ppBase.writeBlock(0, br) br.reset() for i := range rcs { - rcs[i].resetKeepName() + rcs[i].resetValues() } } diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go index ea9e34ab7..1397a1e60 100644 --- a/lib/logstorage/pipe_stats.go +++ b/lib/logstorage/pipe_stats.go @@ -434,14 +434,10 @@ func (psp *pipeStatsProcessor) flush() error { rcs := make([]resultColumn, 0, len(byFields)+len(psp.ps.funcs)) for _, bf := range byFields { - rcs = append(rcs, resultColumn{ - name: bf.name, - }) + rcs = appendResultColumnWithName(rcs, bf.name) } for _, f := range psp.ps.funcs { - rcs = append(rcs, resultColumn{ - name: f.resultName, - }) + rcs = appendResultColumnWithName(rcs, f.resultName) } var br blockResult @@ -487,7 +483,7 @@ func (psp *pipeStatsProcessor) flush() error { psp.ppBase.writeBlock(0, &br) br.reset() for i := range rcs { - rcs[i].resetKeepName() + rcs[i].resetValues() } valuesLen = 0 } diff --git a/lib/logstorage/pipe_topk.go b/lib/logstorage/pipe_topk.go index 34504df90..ea77370e1 100644 --- a/lib/logstorage/pipe_topk.go +++ b/lib/logstorage/pipe_topk.go @@ -462,14 +462,10 @@ func (wctx *pipeTopkWriteContext) writeNextRow(shard *pipeTopkProcessorShard) bo rcs = wctx.rcs[:0] for _, bf := range byFields { - rcs = append(rcs, resultColumn{ - name: bf.name, - }) + rcs = appendResultColumnWithName(rcs, bf.name) } for _, c := range r.otherColumns { - rcs = append(rcs, resultColumn{ - name: c.Name, - }) + rcs = appendResultColumnWithName(rcs, c.Name) } wctx.rcs = rcs } @@ -515,7 +511,7 @@ func (wctx *pipeTopkWriteContext) flush() { wctx.ptp.ppBase.writeBlock(0, br) br.reset() for i := range rcs { - rcs[i].resetKeepName() + rcs[i].resetValues() } } diff --git a/lib/logstorage/pipe_uniq.go b/lib/logstorage/pipe_uniq.go index 03068bb73..6b43fb442 100644 --- a/lib/logstorage/pipe_uniq.go +++ b/lib/logstorage/pipe_uniq.go @@ -359,9 +359,7 @@ func (wctx *pipeUniqWriteContext) writeRow(rowFields []Field) { rcs = wctx.rcs[:0] for _, f := range rowFields { - rcs = append(rcs, resultColumn{ - name: f.Name, - }) + rcs = appendResultColumnWithName(rcs, f.Name) } wctx.rcs = rcs } @@ -391,7 +389,7 @@ func (wctx *pipeUniqWriteContext) flush() { wctx.pup.ppBase.writeBlock(0, br) br.reset() for i := range rcs { - rcs[i].resetKeepName() + rcs[i].resetValues() } } diff --git a/lib/logstorage/pipe_unpack_json.go b/lib/logstorage/pipe_unpack_json.go new file mode 100644 index 000000000..52a3d2b4a --- /dev/null +++ b/lib/logstorage/pipe_unpack_json.go @@ -0,0 +1,237 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/valyala/fastjson" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +// pipeUnpackJSON processes '| unpack_json ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe +type pipeUnpackJSON struct { + fromField string + + resultPrefix string +} + +func (pu *pipeUnpackJSON) String() string { + s := "unpack_json" + if !isMsgFieldName(pu.fromField) { + s += " from " + quoteTokenIfNeeded(pu.fromField) + } + if pu.resultPrefix != "" { + s += " prefix " + quoteTokenIfNeeded(pu.resultPrefix) + } + return s +} + +func (pu *pipeUnpackJSON) updateNeededFields(neededFields, unneededFields fieldsSet) { + if neededFields.contains("*") { + unneededFields.remove(pu.fromField) + } else { + neededFields.add(pu.fromField) + } +} + +func (pu *pipeUnpackJSON) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppBase pipeProcessor) pipeProcessor { + shards := make([]pipeUnpackJSONProcessorShard, workersCount) + + pup := &pipeUnpackJSONProcessor{ + pu: pu, + stopCh: stopCh, + ppBase: ppBase, + + shards: shards, + } + return pup +} + +type pipeUnpackJSONProcessor struct { + pu *pipeUnpackJSON + stopCh <-chan struct{} + ppBase pipeProcessor + + shards []pipeUnpackJSONProcessorShard +} + +type pipeUnpackJSONProcessorShard struct { + pipeUnpackJSONProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeUnpackJSONProcessorShardNopad{})%128]byte +} + +type pipeUnpackJSONProcessorShardNopad struct { + jsonParser fastjson.Parser + jsonFields []Field + jsonValuesBuf []byte + + rcs []resultColumn + br blockResult + + valuesLen int +} + +func (shard *pipeUnpackJSONProcessorShard) writeRow(ppBase pipeProcessor, br *blockResult, cs []*blockResultColumn, rowIdx int, extraFields []Field) { + rcs := shard.rcs + + areEqualColumns := len(rcs) == len(cs)+len(extraFields) + if areEqualColumns { + for i, f := range extraFields { + if rcs[len(rcs)+i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to bbBase and construct a block with new set of columns + shard.flush(ppBase) + + rcs = shard.rcs[:0] + for _, c := range cs { + rcs = appendResultColumnWithName(rcs, c.name) + } + for _, f := range extraFields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + shard.rcs = rcs + } + + for i, c := range cs { + v := c.getValueAtRow(br, rowIdx) + rcs[i].addValue(v) + shard.valuesLen += len(v) + } + for i, f := range extraFields { + v := f.Value + rcs[len(rcs)+i].addValue(v) + shard.valuesLen += len(v) + } + if shard.valuesLen >= 1_000_000 { + shard.flush(ppBase) + } +} + +func (shard *pipeUnpackJSONProcessorShard) flush(ppBase pipeProcessor) { + rcs := shard.rcs + + shard.valuesLen = 0 + + if len(rcs) == 0 { + return + } + + // Flush rcs to ppBase + br := &shard.br + br.setResultColumns(rcs) + ppBase.writeBlock(0, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func (shard *pipeUnpackJSONProcessorShard) parseJSONFields(resultPrefix, v string) { + clear(shard.jsonFields) + shard.jsonFields = shard.jsonFields[:0] + shard.jsonValuesBuf = shard.jsonValuesBuf[:0] + + jsv, err := shard.jsonParser.Parse(v) + if err != nil { + return + } + jso := jsv.GetObject() + buf := shard.jsonValuesBuf + jso.Visit(func(k []byte, v *fastjson.Value) { + var bv []byte + if v.Type() == fastjson.TypeString { + bv = v.GetStringBytes() + } else { + bufLen := len(buf) + buf = v.MarshalTo(buf) + bv = buf[bufLen:] + } + if resultPrefix != "" { + bufLen := len(buf) + buf = append(buf, resultPrefix...) + buf = append(buf, k...) + k = buf[bufLen:] + } + shard.jsonFields = append(shard.jsonFields, Field{ + Name: bytesutil.ToUnsafeString(k), + Value: bytesutil.ToUnsafeString(bv), + }) + }) + shard.jsonValuesBuf = buf +} + +func (pup *pipeUnpackJSONProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + resultPrefix := pup.pu.resultPrefix + shard := &pup.shards[workerID] + + cs := br.getColumns() + c := br.getColumnByName(pup.pu.fromField) + if c.isConst { + v := c.valuesEncoded[0] + shard.parseJSONFields(resultPrefix, v) + for rowIdx := range br.timestamps { + shard.writeRow(pup.ppBase, br, cs, rowIdx, shard.jsonFields) + } + } else { + values := c.getValues(br) + for i, v := range values { + if i == 0 || values[i-1] != v { + shard.parseJSONFields(resultPrefix, v) + } + shard.writeRow(pup.ppBase, br, cs, i, shard.jsonFields) + } + } + + shard.flush(pup.ppBase) +} + +func (pup *pipeUnpackJSONProcessor) flush() error { + return nil +} + +func parsePipeUnpackJSON(lex *lexer) (*pipeUnpackJSON, error) { + if !lex.isKeyword("unpack_json") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_json") + } + lex.nextToken() + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + resultPrefix := "" + if lex.isKeyword("result_prefix") { + lex.nextToken() + p, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err) + } + resultPrefix = p + } + + pu := &pipeUnpackJSON{ + fromField: fromField, + resultPrefix: resultPrefix, + } + return pu, nil +} diff --git a/lib/logstorage/pipe_unpack_json_test.go b/lib/logstorage/pipe_unpack_json_test.go new file mode 100644 index 000000000..8d3dc1c9a --- /dev/null +++ b/lib/logstorage/pipe_unpack_json_test.go @@ -0,0 +1,38 @@ +package logstorage + +import ( + "testing" +) + +func TestPipeUnpackJSONUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + + nfs := newTestFieldsSet(neededFields) + unfs := newTestFieldsSet(unneededFields) + + lex := newLexer(s) + p, err := parsePipeUnpackJSON(lex) + if err != nil { + t.Fatalf("cannot parse %s: %s", s, err) + } + p.updateNeededFields(nfs, unfs) + + assertNeededFields(t, nfs, unfs, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("unpack_json from x", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("unpack_json from x", "*", "f1,f2", "*", "f1,f2") + + // all the needed fields, unneeded fields intersect with src + f("unpack_json from x", "*", "f2,x", "*", "f2") + + // needed fields do not intersect with src + f("unpack_json from x", "f1,f2", "", "f1,f2,x", "") + + // needed fields intersect with src + f("unpack_json from x", "f2,x", "", "f2,x", "") +}