VictoriaMetrics/lib/logstorage/pipe_field_names.go

236 lines
5.4 KiB
Go
Raw Permalink Normal View History

2024-05-20 02:08:30 +00:00
package logstorage
import (
"fmt"
"strings"
"unsafe"
)
// pipeFieldNames processes '| field_names' pipe.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#field_names-pipe
2024-05-20 02:08:30 +00:00
type pipeFieldNames struct {
2024-05-24 01:06:55 +00:00
// resultName is an optional name of the column to write results to.
// By default results are written into 'name' column.
2024-05-20 02:08:30 +00:00
resultName string
// if isFirstPipe is set, then there is no need in loading columnsHeader in writeBlock().
2024-05-20 02:08:30 +00:00
isFirstPipe bool
}
func (pf *pipeFieldNames) String() string {
2024-05-24 01:06:55 +00:00
s := "field_names"
if pf.resultName != "name" {
s += " as " + quoteTokenIfNeeded(pf.resultName)
}
return s
2024-05-20 02:08:30 +00:00
}
2024-06-27 12:18:42 +00:00
func (pf *pipeFieldNames) canLiveTail() bool {
return false
}
2024-05-20 02:08:30 +00:00
func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) {
if pf.isFirstPipe {
neededFields.reset()
} else {
neededFields.add("*")
2024-05-20 02:08:30 +00:00
}
unneededFields.reset()
2024-05-20 02:08:30 +00:00
}
2024-05-25 19:36:16 +00:00
func (pf *pipeFieldNames) hasFilterInWithQuery() bool {
return false
}
func (pf *pipeFieldNames) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
2024-05-25 19:36:16 +00:00
return pf, nil
}
func (pf *pipeFieldNames) newPipeProcessor(workersCount int, stopCh <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
2024-05-20 02:08:30 +00:00
shards := make([]pipeFieldNamesProcessorShard, workersCount)
pfp := &pipeFieldNamesProcessor{
pf: pf,
stopCh: stopCh,
2024-05-25 19:36:16 +00:00
ppNext: ppNext,
2024-05-20 02:08:30 +00:00
shards: shards,
}
return pfp
}
type pipeFieldNamesProcessor struct {
pf *pipeFieldNames
stopCh <-chan struct{}
2024-05-25 19:36:16 +00:00
ppNext pipeProcessor
2024-05-20 02:08:30 +00:00
shards []pipeFieldNamesProcessorShard
}
type pipeFieldNamesProcessorShard struct {
pipeFieldNamesProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [128 - unsafe.Sizeof(pipeFieldNamesProcessorShardNopad{})%128]byte
}
type pipeFieldNamesProcessorShardNopad struct {
2024-05-24 01:06:55 +00:00
// m holds hits per each field name
m map[string]*uint64
}
func (shard *pipeFieldNamesProcessorShard) getM() map[string]*uint64 {
if shard.m == nil {
shard.m = make(map[string]*uint64)
}
return shard.m
2024-05-20 02:08:30 +00:00
}
func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) {
if br.rowsLen == 0 {
2024-05-20 02:08:30 +00:00
return
}
// Assume that the column is set for all the rows in the block.
// This is much faster than reading all the column values and counting non-empty rows.
hits := uint64(br.rowsLen)
2024-05-24 01:06:55 +00:00
shard := &pfp.shards[workerID]
if !pfp.pf.isFirstPipe || br.bs == nil || br.bs.partFormatVersion() < 1 {
cs := br.getColumns()
for _, c := range cs {
shard.updateColumnHits(c.name, hits)
2024-05-20 02:08:30 +00:00
}
} else {
cshIndex := br.bs.getColumnsHeaderIndex()
shard.updateHits(cshIndex.columnHeadersRefs, br, hits)
shard.updateHits(cshIndex.constColumnsRefs, br, hits)
shard.updateColumnHits("_time", hits)
shard.updateColumnHits("_stream", hits)
shard.updateColumnHits("_stream_id", hits)
}
}
2024-05-24 01:06:55 +00:00
func (shard *pipeFieldNamesProcessorShard) updateHits(refs []columnHeaderRef, br *blockResult, hits uint64) {
for _, cr := range refs {
columnName := br.bs.getColumnNameByID(cr.columnNameID)
shard.updateColumnHits(columnName, hits)
2024-05-20 02:08:30 +00:00
}
}
func (shard *pipeFieldNamesProcessorShard) updateColumnHits(columnName string, hits uint64) {
if columnName == "" {
columnName = "_msg"
}
m := shard.getM()
pHits := m[columnName]
if pHits == nil {
nameCopy := strings.Clone(columnName)
hits := uint64(0)
pHits = &hits
m[nameCopy] = pHits
}
*pHits += hits
}
2024-05-20 02:08:30 +00:00
func (pfp *pipeFieldNamesProcessor) flush() error {
if needStop(pfp.stopCh) {
return nil
}
// merge state across shards
shards := pfp.shards
2024-05-24 01:06:55 +00:00
m := shards[0].getM()
2024-05-20 02:08:30 +00:00
shards = shards[1:]
for i := range shards {
2024-05-24 01:06:55 +00:00
for name, pHitsSrc := range shards[i].getM() {
pHits := m[name]
if pHits == nil {
2024-05-24 01:06:55 +00:00
m[name] = pHitsSrc
} else {
*pHits += *pHitsSrc
}
2024-05-20 02:08:30 +00:00
}
}
// write result
wctx := &pipeFieldNamesWriteContext{
pfp: pfp,
}
wctx.rcs[0].name = pfp.pf.resultName
2024-05-24 01:06:55 +00:00
wctx.rcs[1].name = "hits"
for name, pHits := range m {
hits := string(marshalUint64String(nil, *pHits))
wctx.writeRow(name, hits)
2024-05-20 02:08:30 +00:00
}
wctx.flush()
return nil
}
type pipeFieldNamesWriteContext struct {
pfp *pipeFieldNamesProcessor
2024-05-24 01:06:55 +00:00
rcs [2]resultColumn
2024-05-20 02:08:30 +00:00
br blockResult
2024-05-22 19:01:20 +00:00
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
2024-05-20 02:08:30 +00:00
valuesLen int
}
2024-05-24 01:06:55 +00:00
func (wctx *pipeFieldNamesWriteContext) writeRow(name, hits string) {
wctx.rcs[0].addValue(name)
wctx.rcs[1].addValue(hits)
wctx.valuesLen += len(name) + len(hits)
2024-05-22 19:01:20 +00:00
wctx.rowsCount++
2024-05-20 02:08:30 +00:00
if wctx.valuesLen >= 1_000_000 {
wctx.flush()
}
}
func (wctx *pipeFieldNamesWriteContext) flush() {
br := &wctx.br
wctx.valuesLen = 0
2024-05-25 19:36:16 +00:00
// Flush rcs to ppNext
2024-05-24 01:06:55 +00:00
br.setResultColumns(wctx.rcs[:], wctx.rowsCount)
2024-05-22 19:01:20 +00:00
wctx.rowsCount = 0
2024-05-25 19:36:16 +00:00
wctx.pfp.ppNext.writeBlock(0, br)
2024-05-20 02:08:30 +00:00
br.reset()
wctx.rcs[0].resetValues()
2024-05-24 01:06:55 +00:00
wctx.rcs[1].resetValues()
2024-05-20 02:08:30 +00:00
}
func parsePipeFieldNames(lex *lexer) (*pipeFieldNames, error) {
if !lex.isKeyword("field_names") {
return nil, fmt.Errorf("expecting 'field_names'; got %q", lex.token)
}
lex.nextToken()
2024-05-24 01:06:55 +00:00
resultName := "name"
2024-05-20 02:08:30 +00:00
if lex.isKeyword("as") {
lex.nextToken()
2024-05-24 01:06:55 +00:00
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
} else if !lex.isKeyword("", "|") {
name, err := parseFieldName(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse result name for 'field_names': %w", err)
}
resultName = name
2024-05-20 02:08:30 +00:00
}
pf := &pipeFieldNames{
resultName: resultName,
}
return pf, nil
}