diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index 574d9bf7a..7cdaa08b3 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -16,6 +16,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip * FEATURE: added ability to receive systemd (journald) logs over network. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4618). +* FEATURE: improve performance for [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe) when it is applied directly to [log filter](https://docs.victoriametrics.com/victorialogs/logsql/#filters). * BUGFIX: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): fix various glitches with updating query responses. The issue was introduced in [v0.36.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.36.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7279). diff --git a/lib/logstorage/block_search.go b/lib/logstorage/block_search.go index c932cbc8d..c2e40f9d6 100644 --- a/lib/logstorage/block_search.go +++ b/lib/logstorage/block_search.go @@ -337,6 +337,14 @@ func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) { return id, ok } +func (bs *blockSearch) getColumnNameByID(columnNameID uint64) string { + columnNames := bs.bsw.p.columnNames + if columnNameID >= uint64(len(columnNames)) { + logger.Panicf("FATAL: %s: too big columnNameID=%d; it must be smaller than %d", bs.bsw.p.path, columnNameID, len(columnNames)) + } + return columnNames[columnNameID] +} + func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex { if bs.partFormatVersion() < 1 { logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion()) diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 9cbcce2b8..ee3bc7652 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1885,12 +1885,12 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | fields x,f1 | filter foo f1:bar | rm f2`, `f1,x`, ``) f(`* | rm x,f1 | filter foo f1:bar`, `*`, `f1,x`) - f(`* | field_names as foo`, `*`, `_time`) - f(`* | field_names foo | fields bar`, `*`, `_time`) - f(`* | field_names foo | fields foo`, `*`, `_time`) - f(`* | field_names foo | rm foo`, `*`, `_time`) - f(`* | field_names foo | rm bar`, `*`, `_time`) - f(`* | field_names foo | rm _time`, `*`, `_time`) + f(`* | field_names as foo`, ``, ``) + f(`* | field_names foo | fields bar`, ``, ``) + f(`* | field_names foo | fields foo`, ``, ``) + f(`* | field_names foo | rm foo`, ``, ``) + f(`* | field_names foo | rm bar`, ``, ``) + f(`* | field_names foo | rm _time`, ``, ``) f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``) f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`) @@ -2012,7 +2012,7 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | extract if (q:w p:a) "bar" from x | count() r1`, `p,q`, ``) f(`* | extract_regexp "(?P.*)bar" from x | count() r1`, ``, ``) f(`* | extract_regexp if (q:w p:a) "(?P.*)bar" from x | count() r1`, `p,q`, ``) - f(`* | field_names | count() r1`, `*`, `_time`) + f(`* | field_names | count() r1`, ``, ``) f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``) f(`* | blocks_count | count() r1`, ``, ``) f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``) diff --git a/lib/logstorage/pipe_field_names.go b/lib/logstorage/pipe_field_names.go index 8d6e393fb..283cb7675 100644 --- a/lib/logstorage/pipe_field_names.go +++ b/lib/logstorage/pipe_field_names.go @@ -14,9 +14,7 @@ type pipeFieldNames struct { // By default results are written into 'name' column. resultName string - // isFirstPipe is set to true if '| field_names' pipe is the first in the query. - // - // This allows skipping loading of _time column. + // if isFirstPipe is set, then there is no need in loading columnsHeader in writeBlock(). isFirstPipe bool } @@ -33,12 +31,12 @@ func (pf *pipeFieldNames) canLiveTail() bool { } func (pf *pipeFieldNames) updateNeededFields(neededFields, unneededFields fieldsSet) { - neededFields.add("*") - unneededFields.reset() - if pf.isFirstPipe { - unneededFields.add("_time") + neededFields.reset() + } else { + neededFields.add("*") } + unneededFields.reset() } func (pf *pipeFieldNames) optimize() { @@ -98,25 +96,48 @@ func (pfp *pipeFieldNamesProcessor) writeBlock(workerID uint, br *blockResult) { return } + // Assume that the column is set for all the rows in the block. + // This is much faster than reading all the column values and counting non-empty rows. + hits := uint64(br.rowsLen) + shard := &pfp.shards[workerID] - m := shard.getM() - - cs := br.getColumns() - for _, c := range cs { - pHits := m[c.name] - if pHits == nil { - nameCopy := strings.Clone(c.name) - hits := uint64(0) - pHits = &hits - m[nameCopy] = pHits + if !pfp.pf.isFirstPipe || br.bs == nil || br.bs.partFormatVersion() < 1 { + cs := br.getColumns() + for _, c := range cs { + shard.updateColumnHits(c.name, hits) } - - // Assume that the column is set for all the rows in the block. - // This is much faster than reading all the column values and counting non-empty rows. - *pHits += uint64(br.rowsLen) + } else { + cshIndex := br.bs.getColumnsHeaderIndex() + shard.updateHits(cshIndex.columnHeadersRefs, br, hits) + shard.updateHits(cshIndex.constColumnsRefs, br, hits) + shard.updateColumnHits("_time", hits) + shard.updateColumnHits("_stream", hits) + shard.updateColumnHits("_stream_id", hits) } } +func (shard *pipeFieldNamesProcessorShard) updateHits(refs []columnHeaderRef, br *blockResult, hits uint64) { + for _, cr := range refs { + columnName := br.bs.getColumnNameByID(cr.columnNameID) + shard.updateColumnHits(columnName, hits) + } +} + +func (shard *pipeFieldNamesProcessorShard) updateColumnHits(columnName string, hits uint64) { + if columnName == "" { + columnName = "_msg" + } + m := shard.getM() + pHits := m[columnName] + if pHits == nil { + nameCopy := strings.Clone(columnName) + hits := uint64(0) + pHits = &hits + m[nameCopy] = pHits + } + *pHits += hits +} + func (pfp *pipeFieldNamesProcessor) flush() error { if needStop(pfp.stopCh) { return nil @@ -136,14 +157,6 @@ func (pfp *pipeFieldNamesProcessor) flush() error { } } } - if pfp.pf.isFirstPipe { - pHits := m["_stream"] - if pHits == nil { - hits := uint64(0) - pHits = &hits - } - m["_time"] = pHits - } // write result wctx := &pipeFieldNamesWriteContext{