VictoriaMetrics/lib/logstorage/pipe_field_names.go

223 lines
4.8 KiB
Go
Raw Normal View History

2024-05-20 02:08:30 +00:00
package logstorage
import (
"fmt"
"strings"
"unsafe"
)
// pipeFieldNames processes '| field_names' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field-names-pipe
type pipeFieldNames struct {
2024-05-24 01:06:55 +00:00
// resultName is an optional name of the column to write results to.
// By default results are written into 'name' column.
2024-05-20 02:08:30 +00:00
resultName string
// isFirstPipe is set to true if '| field_names' pipe is the first in the query.
//
// This allows skipping loading of _time column.
isFirstPipe bool
}
func (pf *pipeFieldNames) String() string {
2024-05-24 01:06:55 +00:00
s := "field_names"
if pf.resultName != "name" {
s += " as " + quoteTokenIfNeeded(pf.resultName)
}
return s
2024-05-20 02:08:30 +00:00
}
func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) {
neededFields.add("*")
unneededFields.reset()
if pf.isFirstPipe {
unneededFields.add("_time")
}
}
2024-05-25 19:36:16 +00:00
func (pf *pipeFieldNames) optimize() {
// nothing to do
}
func (pf *pipeFieldNames) hasFilterInWithQuery() bool {
return false
}
func (pf *pipeFieldNames) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
return pf, nil
}
func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
2024-05-20 02:08:30 +00:00
shards := make([]pipeFieldNamesProcessorShard, workersCount)
pfp := &pipeFieldNamesProcessor{
pf: pf,
stopCh: stopCh,
2024-05-25 19:36:16 +00:00
ppNext: ppNext,
2024-05-20 02:08:30 +00:00
shards: shards,
}
return pfp
}
type pipeFieldNamesProcessor struct {
pf *pipeFieldNames
stopCh <-chan struct{}
2024-05-25 19:36:16 +00:00
ppNext pipeProcessor
2024-05-20 02:08:30 +00:00
shards []pipeFieldNamesProcessorShard
}
type pipeFieldNamesProcessorShard struct {
pipeFieldNamesProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeFieldNamesProcessorShardNopad{})%128]byte
}
type pipeFieldNamesProcessorShardNopad struct {
2024-05-24 01:06:55 +00:00
// m holds hits per each field name
m map[string]*uint64
}
func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
}
return shard.m
2024-05-20 02:08:30 +00:00
}
func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) {
if len(br.timestamps) == 0 {
return
}
shard := &pfp.shards[workerID]
2024-05-24 01:06:55 +00:00
m := shard.getM()
2024-05-20 02:08:30 +00:00
cs := br.getColumns()
for _, c := range cs {
2024-05-24 01:06:55 +00:00
pHits, ok := m[c.name]
if !ok {
2024-05-20 02:08:30 +00:00
nameCopy := strings.Clone(c.name)
2024-05-24 01:06:55 +00:00
hits := uint64(0)
pHits = &hits
m[nameCopy] = pHits
2024-05-20 02:08:30 +00:00
}
2024-05-24 01:06:55 +00:00
// Assume that the column is set for all the rows in the block.
// This is much faster than reading all the column values and counting non-empty rows.
*pHits += uint64(len(br.timestamps))
2024-05-20 02:08:30 +00:00
}
}
func (pfp *pipeFieldNamesProcessor) flush() error {
if needStop(pfp.stopCh) {
return nil
}
// merge state across shards
shards := pfp.shards
2024-05-24 01:06:55 +00:00
m := shards[0].getM()
2024-05-20 02:08:30 +00:00
shards = shards[1:]
for i := range shards {
2024-05-24 01:06:55 +00:00
for name, pHitsSrc := range shards[i].getM() {
pHits, ok := m[name]
if !ok {
m[name] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
2024-05-20 02:08:30 +00:00
}
}
if pfp.pf.isFirstPipe {
2024-05-24 01:06:55 +00:00
pHits := m["_stream"]
if pHits == nil {
hits := uint64(0)
pHits = &hits
}
m["_time"] = pHits
2024-05-20 02:08:30 +00:00
}
// write result
wctx := &pipeFieldNamesWriteContext{
pfp: pfp,
}
wctx.rcs[0].name = pfp.pf.resultName
2024-05-24 01:06:55 +00:00
wctx.rcs[1].name = "hits"
for name, pHits := range m {
hits := string(marshalUint64String(nil, *pHits))
wctx.writeRow(name, hits)
2024-05-20 02:08:30 +00:00
}
wctx.flush()
return nil
}
type pipeFieldNamesWriteContext struct {
pfp *pipeFieldNamesProcessor
2024-05-24 01:06:55 +00:00
rcs [2]resultColumn
2024-05-20 02:08:30 +00:00
br blockResult
2024-05-22 19:01:20 +00:00
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
2024-05-20 02:08:30 +00:00
valuesLen int
}
2024-05-24 01:06:55 +00:00
func (wctx *pipeFieldNamesWriteContext) writeRow(name, hits string) {
wctx.rcs[0].addValue(name)
wctx.rcs[1].addValue(hits)
wctx.valuesLen += len(name) + len(hits)
2024-05-22 19:01:20 +00:00
wctx.rowsCount++
2024-05-20 02:08:30 +00:00
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeFieldNamesWriteContext) flush() {
br := &wctx.br
wctx.valuesLen = 0
2024-05-25 19:36:16 +00:00
// Flush rcs to ppNext
2024-05-24 01:06:55 +00:00
br.setResultColumns(wctx.rcs[:], wctx.rowsCount)
2024-05-22 19:01:20 +00:00
wctx.rowsCount = 0
2024-05-25 19:36:16 +00:00
wctx.pfp.ppNext.writeBlock(0, br)
2024-05-20 02:08:30 +00:00
br.reset()
wctx.rcs[0].resetValues()
2024-05-24 01:06:55 +00:00
wctx.rcs[1].resetValues()
2024-05-20 02:08:30 +00:00
}
func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) {
if !lex.isKeyword("field_names") {
return nil, fmt.Errorf("expecting 'field_names'; got %q", lex.token)
}
lex.nextToken()
2024-05-24 01:06:55 +00:00
resultName := "name"
2024-05-20 02:08:30 +00:00
if lex.isKeyword("as") {
lex.nextToken()
2024-05-24 01:06:55 +00:00
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
2024-05-20 02:08:30 +00:00
}
pf := &pipeFieldNames{
resultName: resultName,
}
return pf, nil
}