VictoriaMetrics/lib/logstorage/pipe_unroll.go
Aliaksandr Valialkin a4ea3b87d7
lib/logstorage: optimize query imeediately after its parsing
This eliminates possible bugs related to forgotten Query.Optimize() calls.

This also allows removing optimize() function from pipe interface.

While at it, drop filterNoop inside filterAnd.

(cherry picked from commit 66b2987f49)
2024-11-08 17:07:56 +01:00

280 lines
6.1 KiB
Go

package logstorage
import (
"fmt"
"slices"
"unsafe"
"github.com/valyala/fastjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
// pipeUnroll processes '| unroll ...' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe
type pipeUnroll struct {
// fields to unroll
fields []string
// iff is an optional filter for skipping the unroll
iff *ifFilter
}
func (pu *pipeUnroll) String() string {
s := "unroll"
if pu.iff != nil {
s += " " + pu.iff.String()
}
s += " by (" + fieldNamesString(pu.fields) + ")"
return s
}
func (pu *pipeUnroll) canLiveTail() bool {
return true
}
func (pu *pipeUnroll) hasFilterInWithQuery() bool {
return pu.iff.hasFilterInWithQuery()
}
func (pu *pipeUnroll) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
if err != nil {
return nil, err
}
puNew := *pu
puNew.iff = iffNew
return &puNew, nil
}
func (pu *pipeUnroll) updateNeededFields(neededFields, unneededFields fieldsSet) {
if neededFields.contains("*") {
if pu.iff != nil {
unneededFields.removeFields(pu.iff.neededFields)
}
unneededFields.removeFields(pu.fields)
} else {
if pu.iff != nil {
neededFields.addFields(pu.iff.neededFields)
}
neededFields.addFields(pu.fields)
}
}
func (pu *pipeUnroll) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
return &pipeUnrollProcessor{
pu: pu,
stopCh: stopCh,
ppNext: ppNext,
shards: make([]pipeUnrollProcessorShard, workersCount),
}
}
type pipeUnrollProcessor struct {
pu *pipeUnroll
stopCh <-chan struct{}
ppNext pipeProcessor
shards []pipeUnrollProcessorShard
}
type pipeUnrollProcessorShard struct {
pipeUnrollProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeUnrollProcessorShardNopad{})%128]byte
}
type pipeUnrollProcessorShardNopad struct {
bm bitmap
wctx pipeUnpackWriteContext
a arena
columnValues [][]string
unrolledValues [][]string
valuesBuf []string
fields []Field
}
func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
return
}
pu := pup.pu
shard := &pup.shards[workerID]
shard.wctx.init(workerID, pup.ppNext, false, false, br)
bm := &shard.bm
bm.init(br.rowsLen)
bm.setBits()
if iff := pu.iff; iff != nil {
iff.f.applyToBlockResult(br, bm)
if bm.isZero() {
pup.ppNext.writeBlock(workerID, br)
return
}
}
shard.columnValues = slicesutil.SetLength(shard.columnValues, len(pu.fields))
columnValues := shard.columnValues
for i, f := range pu.fields {
c := br.getColumnByName(f)
columnValues[i] = c.getValues(br)
}
fields := shard.fields
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
if needStop(pup.stopCh) {
return
}
if bm.isSetBit(rowIdx) {
shard.writeUnrolledFields(pu.fields, columnValues, rowIdx)
} else {
fields = fields[:0]
for i, f := range pu.fields {
v := columnValues[i][rowIdx]
fields = append(fields, Field{
Name: f,
Value: v,
})
}
shard.wctx.writeRow(rowIdx, fields)
}
}
shard.wctx.flush()
shard.wctx.reset()
shard.a.reset()
}
func (shard *pipeUnrollProcessorShard) writeUnrolledFields(fieldNames []string, columnValues [][]string, rowIdx int) {
// unroll values at rowIdx row
shard.unrolledValues = slicesutil.SetLength(shard.unrolledValues, len(columnValues))
unrolledValues := shard.unrolledValues
valuesBuf := shard.valuesBuf[:0]
for i, values := range columnValues {
v := values[rowIdx]
valuesBufLen := len(valuesBuf)
valuesBuf = unpackJSONArray(valuesBuf, &shard.a, v)
unrolledValues[i] = valuesBuf[valuesBufLen:]
}
shard.valuesBuf = valuesBuf
// find the number of rows across unrolled values
rows := len(unrolledValues[0])
for _, values := range unrolledValues[1:] {
if len(values) > rows {
rows = len(values)
}
}
if rows == 0 {
// Unroll too a single row with empty unrolled values.
rows = 1
}
// write unrolled values to the next pipe.
fields := shard.fields
for unrollIdx := 0; unrollIdx < rows; unrollIdx++ {
fields = fields[:0]
for i, values := range unrolledValues {
v := ""
if unrollIdx < len(values) {
v = values[unrollIdx]
}
fields = append(fields, Field{
Name: fieldNames[i],
Value: v,
})
}
shard.wctx.writeRow(rowIdx, fields)
}
shard.fields = fields
}
func (pup *pipeUnrollProcessor) flush() error {
return nil
}
func parsePipeUnroll(lex *lexer) (*pipeUnroll, error) {
if !lex.isKeyword("unroll") {
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unroll")
}
lex.nextToken()
// parse optional if (...)
var iff *ifFilter
if lex.isKeyword("if") {
f, err := parseIfFilter(lex)
if err != nil {
return nil, err
}
iff = f
}
// parse by (...)
if lex.isKeyword("by") {
lex.nextToken()
}
fields, err := parseFieldNamesInParens(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse 'by(...)' at 'unroll': %w", err)
}
if len(fields) == 0 {
return nil, fmt.Errorf("'by(...)' at 'unroll' must contain at least a single field")
}
if slices.Contains(fields, "*") {
return nil, fmt.Errorf("unroll by '*' isn't supported")
}
pu := &pipeUnroll{
fields: fields,
iff: iff,
}
return pu, nil
}
func unpackJSONArray(dst []string, a *arena, s string) []string {
if s == "" || s[0] != '[' {
return dst
}
p := jspp.Get()
defer jspp.Put(p)
jsv, err := p.Parse(s)
if err != nil {
return dst
}
jsa, err := jsv.Array()
if err != nil {
return dst
}
for _, jsv := range jsa {
if jsv.Type() == fastjson.TypeString {
sb, err := jsv.StringBytes()
if err != nil {
logger.Panicf("BUG: unexpected error returned from StringBytes(): %s", err)
}
v := a.copyBytesToString(sb)
dst = append(dst, v)
} else {
bLen := len(a.b)
a.b = jsv.MarshalTo(a.b)
v := bytesutil.ToUnsafeString(a.b[bLen:])
dst = append(dst, v)
}
}
return dst
}
var jspp fastjson.ParserPool