From 79c03fc35fc2bdd0fa87d538c53e092faaf58ef2 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Tue, 28 May 2024 19:29:41 +0200
Subject: [PATCH] lib/logstorage: work-in-progress

---
 docs/VictoriaLogs/CHANGELOG.md             |  12 +
 docs/VictoriaLogs/LogsQL.md                | 164 ++++-
 lib/logstorage/filter_and.go               | 144 ++--
 lib/logstorage/filter_in.go                | 102 ++-
 lib/logstorage/filter_in_test.go           |  31 +
 lib/logstorage/filter_or.go                | 112 +++
 lib/logstorage/filter_string_range.go      |   8 +-
 lib/logstorage/parser.go                   |  88 ++-
 lib/logstorage/parser_test.go              |  43 +-
 lib/logstorage/pipe.go                     |  42 +-
 lib/logstorage/pipe_delete.go              |   4 +-
 lib/logstorage/pipe_extract.go             |  20 +-
 lib/logstorage/pipe_extract_regexp.go      | 334 +++++++++
 lib/logstorage/pipe_extract_regexp_test.go | 329 +++++++++
 lib/logstorage/pipe_extract_test.go        |  28 -
 lib/logstorage/pipe_field_values.go        |  93 +++
 lib/logstorage/pipe_field_values_test.go   | 148 ++++
 lib/logstorage/pipe_filter.go              |  10 +-
 lib/logstorage/pipe_filter_test.go         |  21 +
 lib/logstorage/pipe_format.go              |  12 +-
 lib/logstorage/pipe_limit.go               |  17 +-
 lib/logstorage/pipe_limit_test.go          |  12 +-
 lib/logstorage/pipe_math.go                | 776 +++++++++++++++++++++
 lib/logstorage/pipe_math_test.go           | 233 +++++++
 lib/logstorage/pipe_replace.go             |  10 +-
 lib/logstorage/pipe_replace_regexp.go      |  10 +-
 lib/logstorage/pipe_stats.go               |  11 +-
 lib/logstorage/pipe_stats_test.go          |  90 +++
 lib/logstorage/pipe_unpack.go              |  10 +-
 lib/logstorage/pipe_utils_test.go          |  28 +
 lib/logstorage/storage_search.go           |   6 +-
 lib/logstorage/storage_search_test.go      | 152 ++++
 32 files changed, 2911 insertions(+), 189 deletions(-)
 create mode 100644 lib/logstorage/pipe_extract_regexp.go
 create mode 100644 lib/logstorage/pipe_extract_regexp_test.go
 create mode 100644 lib/logstorage/pipe_field_values.go
 create mode 100644 lib/logstorage/pipe_field_values_test.go
 create mode 100644 lib/logstorage/pipe_math.go
 create mode 100644 lib/logstorage/pipe_math_test.go

diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md
index b569f61958..66418ffe46 100644
--- a/docs/VictoriaLogs/CHANGELOG.md
+++ b/docs/VictoriaLogs/CHANGELOG.md
@@ -19,6 +19,18 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
 
 ## tip
 
+## [v0.13.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.13.0-victorialogs)
+
+Released at 2024-05-28
+
+* FEATURE: add [`extract_regexp` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) for extracting arbitrary substrings from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with [RE2 egular expressions](https://github.com/google/re2/wiki/Syntax).
+* FEATURE: add [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
+* FEATURE: add [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe), which returns unique values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
+* FEATURE: allow omitting `stats` prefix in [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe). For example, `_time:5m | count() rows` is a valid query now. It is equivalent to `_time:5m | stats count() as rows`.
+* FEATURE: allow omitting `filter` prefix in [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe) if the filter doesn't clash with [pipe names](#https://docs.victoriametrics.com/victorialogs/logsql/#pipes). For example, `_time:5m | stats by (host) count() rows | rows:>1000` is a valid query now. It is equivalent to `_time:5m | stats by (host) count() rows | filter rows:>1000`.
+* FEATURE: allow [`head` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) without number. For example, `error | head`. In this case 10 last values are returned as `head` Unix command does by default.
+* FEATURE: allow using [comparison filters](https://docs.victoriametrics.com/victorialogs/logsql/#range-comparison-filters) with strings. For example, `some_text_field:>="foo"` matches [log entries](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with `some_text_field` field values bigger or equal to `foo`.
+
 ## [v0.12.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.12.1-victorialogs)
 
 Released at 2024-05-26
diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md
index 8b6e17467e..72317d1a4c 100644
--- a/docs/VictoriaLogs/LogsQL.md
+++ b/docs/VictoriaLogs/LogsQL.md
@@ -255,6 +255,7 @@ The list of LogsQL filters:
 - [Phrase filter](#phrase-filter) - matches logs with the given phrase
 - [Prefix filter](#prefix-filter) - matches logs with the given word prefix or phrase prefix
 - [Substring filter](#substring-filter) - matches logs with the given substring
+- [Range comparison filter](#range-comparison-filter) - matches logs with field values in the provided range
 - [Empty value filter](#empty-value-filter) - matches logs without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
 - [Any value filter](#any-value-filter) - matches logs with the given non-empty [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
 - [Exact filter](#exact-filter) - matches logs with the exact value
@@ -576,6 +577,26 @@ See also:
 - [Regexp filter](#regexp-filter)
 
 
+### Range comparison filter
+
+LogsQL supports `field:>X`, `field:>=X`, `field:<X` and `field:<=X` filters, where `field` is the name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
+and `X` is either [numeric value](#numeric-values) or a string. For example, the following query returns logs containing numeric values for the `response_size` field bigger than `10*1024`:
+
+```logsql
+response_size:>10KiB
+```
+
+The following query returns logs with `user` field containing string values smaller than 'John`:
+
+```logsql
+username:<"John"
+```
+
+See also:
+
+- [String range filter](#string-range-filter)
+- [Range filter](#range-filter)
+
 ### Empty value filter
 
 Sometimes it is needed to find log entries without the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@@ -906,18 +927,12 @@ for searching for log entries with request durations exceeding 4.2 seconds:
 request.duration:range(4.2, Inf)
 ```
 
-This query can be shortened to:
+This query can be shortened to by using [range comparison filter](#range-comparison-filter):
 
 ```logsql
 request.duration:>4.2
 ```
 
-The following query returns logs with request durations smaller or equal to 1.5 seconds:
-
-```logsql
-request.duration:<=1.5
-```
-
 The lower and the upper bounds of the `range(lower, upper)` are excluded by default. If they must be included, then substitute the corresponding
 parentheses with square brackets. For example:
 
@@ -941,6 +956,7 @@ Performance tips:
 
 See also:
 
+- [Range comparison filter](#range-comparison-filter)
 - [IPv4 range filter](#ipv4-range-filter)
 - [String range filter](#string-range-filter)
 - [Length range filter](#length-range-filter)
@@ -1012,6 +1028,7 @@ For example, the `user.name:string_range(C, E)` would match `user.name` fields,
 
 See also:
 
+- [Range comparison filter](#range-comparison-filter)
 - [Range filter](#range-filter)
 - [IPv4 range filter](#ipv4-range-filter)
 - [Length range filter](#length-range-filter)
@@ -1135,11 +1152,14 @@ LogsQL supports the following pipes:
 - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`extract`](#extract-pipe) extracts the sepcified text into the given log fields.
+- [`extract_regexp`](#extract_regexp-pipe) extracts the sepcified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax).
 - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
+- [`field_values`](#field_values-pipe) returns all the values for the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`fields`](#fields-pipe) selects the given set of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`filter`](#filter-pipe) applies additional [filters](#filters) to results.
 - [`format`](#format-pipe) formats ouptut field from input [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`limit`](#limit-pipe) limits the number selected logs.
+- [`math`](#math-pipe) performs mathematical calculations over [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
 - [`offset`](#offset-pipe) skips the given number of selected logs.
 - [`pack_json`](#pack_json-pipe) packs [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into JSON object.
 - [`rename`](#rename-pipe) renames [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
@@ -1188,7 +1208,7 @@ For example, the following query deletes `host` and `app` fields from the logs o
 _time:5m | delete host, app
 ```
 
-`del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | del host` is equivalent to `_time:5m | rm host` and `_time:5m | delete host`.
+`drop`, `del` and `rm` keywords can be used instead of `delete` for convenience. For example, `_time:5m | drop host` is equivalent to `_time:5m | delete host`.
 
 See also:
 
@@ -1251,6 +1271,7 @@ See also:
 - [Conditional extract](#conditional-extract)
 - [`unpack_json` pipe](#unpack_json-pipe)
 - [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
+- [`math` pipe](#math-pipe)
 
 #### Format for extract pipe pattern
 
@@ -1334,6 +1355,34 @@ For example, the following query is equivalent to the previous one:
 _time:5m | extract "ip=<ip> " keep_original_fields
 ```
 
+### extract_regexp pipe
+
+`| extract_regexp "pattern" from field_name` [pipe](#pipes) extracts substrings from the [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
+according to the provided `pattern`, and stores them into field names according to the named fields inside the `pattern`.
+The `pattern` must contain [RE2 regular expression](https://github.com/google/re2/wiki/Syntax) with named fields (aka capturing groups) in the form `(?P<capture_field_name>...)`.
+Matching substrings are stored to the given `capture_field_name` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
+For example, the following query extracts ipv4 addresses from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
+and puts them into `ip` field for logs over the last 5 minutes:
+
+```logsql
+_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)" from _msg
+```
+
+The `from _msg` part can be omitted if the data extraction is performed from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
+So the following query is equivalent to the previous one:
+
+```logsql
+_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"
+```
+
+Performance tip: it is recommended using [`extract` pipe](#extract-pipe) instead of `extract_regexp` for achieving higher query performance.
+
+See also:
+
+- [`extract` pipe](#extract-pipe)
+- [`replace_regexp` pipe](#replace_regexp-pipe)
+- [`unpack_json` pipe](#unpack_json-pipe)
+
 ### field_names pipe
 
 `| field_names` [pipe](#pipes) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
@@ -1348,8 +1397,33 @@ Field names are returned in arbitrary order. Use [`sort` pipe](#sort-pipe) in or
 
 See also:
 
+- [`field_values` pipe](#field_values-pipe)
 - [`uniq` pipe](#uniq-pipe)
 
+### field_values pipe
+
+`| field_values field_name` [pipe](#pipe) returns all the values for the given [`field_name` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
+with the number of logs per each value.
+For example, the following query returns all the values with the number of matching logs for the field `level` over logs logs for the last 5 minutes:
+
+```logsql
+_time:5m | field_values level
+```
+
+It is possible limiting the number of returned values by adding `limit N` to the end of the `field_values ...`. For example, the following query returns
+up to 10 values for the field `user_id` over logs for the last 5 minutes:
+
+```logsql
+_time:5m | field_values user_id limit 10
+```
+
+If the limit is reached, then the set of returned values is random. Also the number of matchin logs per each returned value is zeroed for performance reasons.
+
+See also:
+
+- [`field_names` pipe](#field_names-pipe)
+- [`uniq` pipe)(#uniq-pipe)
+
 ### fields pipe
 
 By default all the [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) are returned in the response.
@@ -1374,8 +1448,7 @@ See also:
 
 ### filter pipe
 
-Sometimes it is needed to apply additional filters on the calculated results. This can be done with `| filter ...` [pipe](#pipes).
-The `filter` pipe can contain arbitrary [filters](#filters).
+The `| filter ...` [pipe](#pipes) allows filtering the selected logs entries with arbitrary [filters](#filters).
 
 For example, the following query returns `host` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) values
 if the number of log messages with the `error` [word](#word) for them over the last hour exceeds `1_000`:
@@ -1384,6 +1457,13 @@ if the number of log messages with the `error` [word](#word) for them over the l
 _time:1h error | stats by (host) count() logs_count | filter logs_count:> 1_000
 ```
 
+It is allowed to omit `filter` prefix if the used filters do not clash with [pipe names](#pipes).
+So the following query is equivalent to the previous one:
+
+```logsql
+_time:1h error | stats by (host) count() logs_count | logs_count:> 1_000
+```
+
 See also:
 
 - [`stats` pipe](#stats-pipe)
@@ -1463,6 +1543,12 @@ _time:5m | limit 100
 
 `head` keyword can be used instead of `limit` for convenience. For example, `_time:5m | head 100` is equivalent to `_time:5m | limit 100`.
 
+The `N` in `head N` can be omitted - in this case up to 10 matching logs are returned:
+
+```logsql
+error | head
+```
+
 By default rows are selected in arbitrary order because of performance reasons, so the query above can return different sets of logs every time it is executed.
 [`sort` pipe](#sort-pipe) can be used for making sure the logs are in the same order before applying `limit ...` to them.
 
@@ -1471,6 +1557,50 @@ See also:
 - [`sort` pipe](#sort-pipe)
 - [`offset` pipe](#offset-pipe)
 
+### math pipe
+
+`| math ...` [pipe](#pipes) performs mathematical calculations over numeric values stored in [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
+For example, the following query divides `duration_msecs` field value by 1000, then rounds them to integer and stores the result in the `duration_secs` field:
+
+```logsql
+_time:5m | math round(duration_msecs / 1000) as duration_secs
+```
+
+The following mathematical operations are supported by `math` pipe:
+
+- `arg1 + arg2` - returns the sum of `arg1` and `arg2`
+- `arg1 - arg2` - returns the difference between `arg1` and `arg2`
+- `arg1 * arg2` - multiplies `arg1` by `arg2`
+- `arg1 / arg2` - divides `arg1` by `arg2`
+- `arg1 % arg2` - returns the remainder of the division of `arg1` by `arg2`
+- `arg1 ^ arg2` - returns the power of `arg1` by `arg2`
+- `abs(arg)` - returns an absolute values for the given `arg`
+- `max(arg1, ..., argN)` - returns the maximum value among the given `arg1`, ..., `argN`
+- `min(arg1, ..., argN)` - returns the minimum value among the given `arg1`, ..., `argN`
+- `round(arg)` - returns rounded to integer value for the given `arg`. The `round()` accepts optional `nearest` arg, which allows rounding the number to the given `nearest` multiple.
+  For example, `round(temperature, 0.1)` rounds `temperature` field to one decimal digit after the point.
+
+Every `argX` argument in every mathematical operation can contain one of the following values:
+
+- The name of [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, `errors_total / requests_total`.
+- Any [supported numeric value](#numeric-values). For example, `response_size_bytes / 1MiB`.
+- Another mathematical expression. Optionally, it may be put inside `(...)`. For example, `(a + b) * c`.
+
+Multiple distinct results can be calculated in a single `math ...` pipe - just separate them with `,`. For example, the following query calculates the error rate
+and the number of successful requests from `errors`, `warnings` and `requests` [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model):
+
+```logsql
+_time:5m | math
+    (errors / requests) as error_rate,
+    (requests - (errors + warnings)) as success_requests
+```
+
+See also:
+
+- [`stats` pipe](#stats-pipe)
+- [`extract` pipe](#extract-pipe)
+
+
 ### offset pipe
 
 If some selected logs must be skipped after [`sort`](#sort-pipe), then `| offset N` [pipe](#pipes) can be used, where `N` can contain any [supported integer numeric value](#numeric-values).
@@ -1738,6 +1868,12 @@ For example, the following query calculates the following stats for logs over th
 _time:5m | stats count() logs_total, count_uniq(_stream) streams_total
 ```
 
+It is allowed to omit `stats` prefix for convenience. So the following query is equivalent to the previous one:
+
+```logsql
+_time:5m | count() logs_total, count_uniq(_stream) streams_total
+```
+
 See also:
 
 - [stats by fields](#stats-by-fields)
@@ -1747,6 +1883,7 @@ See also:
 - [stats by IPv4 buckets](#stats-by-ipv4-buckets)
 - [stats with additional filters](#stats-with-additional-filters)
 - [stats pipe functions](#stats-pipe-functions)
+- [`math` pipe](#math-pipe)
 - [`sort` pipe](#sort-pipe)
 
 
@@ -2462,12 +2599,7 @@ LogsQL supports the following transformations on the log entries selected with [
 - Creating a new field from existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) according to the provided format. See [`format` pipe](#format-pipe).
 - Replacing substrings in the given [log field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
   See [`replace` pipe](#replace-pipe) and [`replace_regexp` pipe](#replace_regexp-pipe) docs.
-
-LogsQL will support the following transformations in the future:
-
-- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
-
-See the [Roadmap](https://docs.victoriametrics.com/victorialogs/roadmap/) for details.
+- Creating a new field according to math calculations over existing [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [`math` pipe](#math-pipe).
 
 It is also possible to perform various transformations on the [selected log entries](#filters) at client side
 with `jq`, `awk`, `cut`, etc. Unix commands according to [these docs](https://docs.victoriametrics.com/victorialogs/querying/#command-line).
diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go
index caea8ae6fb..08bf8736ae 100644
--- a/lib/logstorage/filter_and.go
+++ b/lib/logstorage/filter_and.go
@@ -13,8 +13,13 @@ import (
 type filterAnd struct {
 	filters []filter
 
-	msgTokensOnce sync.Once
-	msgTokens     []string
+	byFieldTokensOnce sync.Once
+	byFieldTokens     []fieldTokens
+}
+
+type fieldTokens struct {
+	field  string
+	tokens []string
 }
 
 func (fa *filterAnd) String() string {
@@ -22,8 +27,7 @@ func (fa *filterAnd) String() string {
 	a := make([]string, len(filters))
 	for i, f := range filters {
 		s := f.String()
-		switch f.(type) {
-		case *filterOr:
+		if _, ok := f.(*filterOr); ok {
 			s = "(" + s + ")"
 		}
 		a[i] = s
@@ -49,8 +53,8 @@ func (fa *filterAnd) applyToBlockResult(br *blockResult, bm *bitmap) {
 }
 
 func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
-	if !fa.matchMessageBloomFilter(bs) {
-		// Fast path - fa doesn't match _msg bloom filter.
+	if !fa.matchBloomFilters(bs) {
+		// Fast path - fa doesn't match bloom filters.
 		bm.resetBits()
 		return
 	}
@@ -66,60 +70,114 @@ func (fa *filterAnd) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
 	}
 }
 
-func (fa *filterAnd) matchMessageBloomFilter(bs *blockSearch) bool {
-	tokens := fa.getMessageTokens()
-	if len(tokens) == 0 {
+func (fa *filterAnd) matchBloomFilters(bs *blockSearch) bool {
+	byFieldTokens := fa.getByFieldTokens()
+	if len(byFieldTokens) == 0 {
 		return true
 	}
 
-	v := bs.csh.getConstColumnValue("_msg")
-	if v != "" {
-		return matchStringByAllTokens(v, tokens)
+	for _, fieldTokens := range byFieldTokens {
+		fieldName := fieldTokens.field
+		tokens := fieldTokens.tokens
+
+		v := bs.csh.getConstColumnValue(fieldName)
+		if v != "" {
+			if !matchStringByAllTokens(v, tokens) {
+				return false
+			}
+			continue
+		}
+
+		ch := bs.csh.getColumnHeader(fieldName)
+		if ch == nil {
+			return false
+		}
+
+		if ch.valueType == valueTypeDict {
+			if !matchDictValuesByAllTokens(ch.valuesDict.values, tokens) {
+				return false
+			}
+			continue
+		}
+		if !matchBloomFilterAllTokens(bs, ch, tokens) {
+			return false
+		}
 	}
 
-	ch := bs.csh.getColumnHeader("_msg")
-	if ch == nil {
-		return false
-	}
-
-	if ch.valueType == valueTypeDict {
-		return matchDictValuesByAllTokens(ch.valuesDict.values, tokens)
-	}
-	return matchBloomFilterAllTokens(bs, ch, tokens)
+	return true
 }
 
-func (fa *filterAnd) getMessageTokens() []string {
-	fa.msgTokensOnce.Do(fa.initMsgTokens)
-	return fa.msgTokens
+func (fa *filterAnd) getByFieldTokens() []fieldTokens {
+	fa.byFieldTokensOnce.Do(fa.initByFieldTokens)
+	return fa.byFieldTokens
 }
 
-func (fa *filterAnd) initMsgTokens() {
-	var a []string
+func (fa *filterAnd) initByFieldTokens() {
+	m := make(map[string]map[string]struct{})
+	byFieldFilters := make(map[string]int)
+	var fieldNames []string
+
 	for _, f := range fa.filters {
+		fieldName := ""
+		var tokens []string
+
 		switch t := f.(type) {
-		case *filterPhrase:
-			if isMsgFieldName(t.fieldName) {
-				a = append(a, t.getTokens()...)
-			}
-		case *filterSequence:
-			if isMsgFieldName(t.fieldName) {
-				a = append(a, t.getTokens()...)
-			}
 		case *filterExact:
-			if isMsgFieldName(t.fieldName) {
-				a = append(a, t.getTokens()...)
-			}
+			fieldName = t.fieldName
+			tokens = t.getTokens()
 		case *filterExactPrefix:
-			if isMsgFieldName(t.fieldName) {
-				a = append(a, t.getTokens()...)
-			}
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterPhrase:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
 		case *filterPrefix:
-			if isMsgFieldName(t.fieldName) {
-				a = append(a, t.getTokens()...)
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterRegexp:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterSequence:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		}
+
+		fieldName = getCanonicalColumnName(fieldName)
+		byFieldFilters[fieldName]++
+
+		if len(tokens) > 0 {
+			mTokens, ok := m[fieldName]
+			if !ok {
+				fieldNames = append(fieldNames, fieldName)
+				mTokens = make(map[string]struct{})
+				m[fieldName] = mTokens
+			}
+			for _, token := range tokens {
+				mTokens[token] = struct{}{}
 			}
 		}
 	}
-	fa.msgTokens = a
+
+	var byFieldTokens []fieldTokens
+	for _, fieldName := range fieldNames {
+		if byFieldFilters[fieldName] < 2 {
+			// It is faster to perform bloom filter match inline when visiting the corresponding column
+			continue
+		}
+
+		mTokens := m[fieldName]
+		tokens := make([]string, 0, len(mTokens))
+		for token := range mTokens {
+			tokens = append(tokens, token)
+		}
+
+		byFieldTokens = append(byFieldTokens, fieldTokens{
+			field:  fieldName,
+			tokens: tokens,
+		})
+	}
+
+	fa.byFieldTokens = byFieldTokens
 }
 
 func matchStringByAllTokens(v string, tokens []string) bool {
diff --git a/lib/logstorage/filter_in.go b/lib/logstorage/filter_in.go
index b1d7b48211..742a0fb40d 100644
--- a/lib/logstorage/filter_in.go
+++ b/lib/logstorage/filter_in.go
@@ -3,6 +3,7 @@ package logstorage
 import (
 	"fmt"
 	"math"
+	"slices"
 	"strings"
 	"sync"
 
@@ -27,8 +28,9 @@ type filterIn struct {
 	// qFieldName must be set to field name for obtaining values from if q is non-nil.
 	qFieldName string
 
-	tokenSetsOnce sync.Once
-	tokenSets     [][]string
+	tokensOnce   sync.Once
+	commonTokens []string
+	tokenSets    [][]string
 
 	stringValuesOnce sync.Once
 	stringValues     map[string]struct{}
@@ -74,28 +76,15 @@ func (fi *filterIn) updateNeededFields(neededFields fieldsSet) {
 	neededFields.add(fi.fieldName)
 }
 
-func (fi *filterIn) getTokenSets() [][]string {
-	fi.tokenSetsOnce.Do(fi.initTokenSets)
-	return fi.tokenSets
+func (fi *filterIn) getTokens() ([]string, [][]string) {
+	fi.tokensOnce.Do(fi.initTokens)
+	return fi.commonTokens, fi.tokenSets
 }
 
-// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter.
-const maxTokenSetsToInit = 1000
+func (fi *filterIn) initTokens() {
+	commonTokens, tokenSets := getCommonTokensAndTokenSets(fi.values)
 
-func (fi *filterIn) initTokenSets() {
-	values := fi.values
-	tokenSetsLen := len(values)
-	if tokenSetsLen > maxTokenSetsToInit {
-		tokenSetsLen = maxTokenSetsToInit
-	}
-	tokenSets := make([][]string, 0, tokenSetsLen+1)
-	for _, v := range values {
-		tokens := tokenizeStrings(nil, []string{v})
-		tokenSets = append(tokenSets, tokens)
-		if len(tokens) > maxTokenSetsToInit {
-			break
-		}
-	}
+	fi.commonTokens = commonTokens
 	fi.tokenSets = tokenSets
 }
 
@@ -385,47 +374,47 @@ func (fi *filterIn) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
 		return
 	}
 
-	tokenSets := fi.getTokenSets()
+	commonTokens, tokenSets := fi.getTokens()
 
 	switch ch.valueType {
 	case valueTypeString:
 		stringValues := fi.getStringValues()
-		matchAnyValue(bs, ch, bm, stringValues, tokenSets)
+		matchAnyValue(bs, ch, bm, stringValues, commonTokens, tokenSets)
 	case valueTypeDict:
 		stringValues := fi.getStringValues()
 		matchValuesDictByAnyValue(bs, ch, bm, stringValues)
 	case valueTypeUint8:
 		binValues := fi.getUint8Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeUint16:
 		binValues := fi.getUint16Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeUint32:
 		binValues := fi.getUint32Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeUint64:
 		binValues := fi.getUint64Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeFloat64:
 		binValues := fi.getFloat64Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeIPv4:
 		binValues := fi.getIPv4Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	case valueTypeTimestampISO8601:
 		binValues := fi.getTimestampISO8601Values()
-		matchAnyValue(bs, ch, bm, binValues, tokenSets)
+		matchAnyValue(bs, ch, bm, binValues, commonTokens, tokenSets)
 	default:
 		logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
 	}
 }
 
-func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, tokenSets [][]string) {
+func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}, commonTokens []string, tokenSets [][]string) {
 	if len(values) == 0 {
 		bm.resetBits()
 		return
 	}
-	if !matchBloomFilterAnyTokenSet(bs, ch, tokenSets) {
+	if !matchBloomFilterAnyTokenSet(bs, ch, commonTokens, tokenSets) {
 		bm.resetBits()
 		return
 	}
@@ -435,7 +424,10 @@ func matchAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[str
 	})
 }
 
-func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets [][]string) bool {
+func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, commonTokens []string, tokenSets [][]string) bool {
+	if len(commonTokens) > 0 {
+		return matchBloomFilterAllTokens(bs, ch, commonTokens)
+	}
 	if len(tokenSets) == 0 {
 		return false
 	}
@@ -453,6 +445,9 @@ func matchBloomFilterAnyTokenSet(bs *blockSearch, ch *columnHeader, tokenSets []
 	return false
 }
 
+// It is faster to match every row in the block instead of checking too big number of tokenSets against bloom filter.
+const maxTokenSetsToInit = 1000
+
 func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, values map[string]struct{}) {
 	bb := bbPool.Get()
 	for _, v := range ch.valuesDict.values {
@@ -465,3 +460,44 @@ func matchValuesDictByAnyValue(bs *blockSearch, ch *columnHeader, bm *bitmap, va
 	matchEncodedValuesDict(bs, ch, bm, bb.B)
 	bbPool.Put(bb)
 }
+
+func getCommonTokensAndTokenSets(values []string) ([]string, [][]string) {
+	tokenSets := make([][]string, len(values))
+	for i, v := range values {
+		tokenSets[i] = tokenizeStrings(nil, []string{v})
+	}
+
+	commonTokens := getCommonTokens(tokenSets)
+	if len(commonTokens) == 0 {
+		return nil, tokenSets
+	}
+	return commonTokens, nil
+}
+
+func getCommonTokens(tokenSets [][]string) []string {
+	if len(tokenSets) == 0 {
+		return nil
+	}
+
+	m := make(map[string]struct{}, len(tokenSets[0]))
+	for _, token := range tokenSets[0] {
+		m[token] = struct{}{}
+	}
+
+	for _, tokens := range tokenSets[1:] {
+		if len(m) == 0 {
+			return nil
+		}
+		for token := range m {
+			if !slices.Contains(tokens, token) {
+				delete(m, token)
+			}
+		}
+	}
+
+	tokens := make([]string, 0, len(m))
+	for token := range m {
+		tokens = append(tokens, token)
+	}
+	return tokens
+}
diff --git a/lib/logstorage/filter_in_test.go b/lib/logstorage/filter_in_test.go
index ffe8e944bd..29f1e35ff5 100644
--- a/lib/logstorage/filter_in_test.go
+++ b/lib/logstorage/filter_in_test.go
@@ -1,6 +1,8 @@
 package logstorage
 
 import (
+	"reflect"
+	"slices"
 	"testing"
 )
 
@@ -688,3 +690,32 @@ func TestFilterIn(t *testing.T) {
 		testFilterMatchForColumns(t, columns, fi, "_msg", nil)
 	})
 }
+
+func TestGetCommonTokensAndTokenSets(t *testing.T) {
+	f := func(values []string, commonTokensExpected []string, tokenSetsExpected [][]string) {
+		t.Helper()
+
+		commonTokens, tokenSets := getCommonTokensAndTokenSets(values)
+		slices.Sort(commonTokens)
+
+		if !reflect.DeepEqual(commonTokens, commonTokensExpected) {
+			t.Fatalf("unexpected commonTokens for values=%q\ngot\n%q\nwant\n%q", values, commonTokens, commonTokensExpected)
+		}
+
+		for i, tokens := range tokenSets {
+			slices.Sort(tokens)
+			tokensExpected := tokenSetsExpected[i]
+			if !reflect.DeepEqual(tokens, tokensExpected) {
+				t.Fatalf("unexpected tokens for value=%q\ngot\n%q\nwant\n%q", values[i], tokens, tokensExpected)
+			}
+		}
+	}
+
+	f(nil, nil, nil)
+	f([]string{"foo"}, []string{"foo"}, nil)
+	f([]string{"foo", "foo"}, []string{"foo"}, nil)
+	f([]string{"foo", "bar", "bar", "foo"}, nil, [][]string{{"foo"}, {"bar"}, {"bar"}, {"foo"}})
+	f([]string{"foo", "foo bar", "bar foo"}, []string{"foo"}, nil)
+	f([]string{"a foo bar", "bar abc foo", "foo abc a bar"}, []string{"bar", "foo"}, nil)
+	f([]string{"a xfoo bar", "xbar abc foo", "foo abc a bar"}, nil, [][]string{{"a", "bar", "xfoo"}, {"abc", "foo", "xbar"}, {"a", "abc", "bar", "foo"}})
+}
diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go
index 5349c43705..256337a0cc 100644
--- a/lib/logstorage/filter_or.go
+++ b/lib/logstorage/filter_or.go
@@ -2,6 +2,7 @@ package logstorage
 
 import (
 	"strings"
+	"sync"
 )
 
 // filterOr contains filters joined by OR operator.
@@ -9,6 +10,9 @@ import (
 // It is epxressed as `f1 OR f2 ... OR fN` in LogsQL.
 type filterOr struct {
 	filters []filter
+
+	byFieldTokensOnce sync.Once
+	byFieldTokens     []fieldTokens
 }
 
 func (fo *filterOr) String() string {
@@ -51,6 +55,12 @@ func (fo *filterOr) applyToBlockResult(br *blockResult, bm *bitmap) {
 }
 
 func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
+	if !fo.matchBloomFilters(bs) {
+		// Fast path - fo doesn't match bloom filters.
+		bm.resetBits()
+		return
+	}
+
 	bmResult := getBitmap(bm.bitsLen)
 	bmTmp := getBitmap(bm.bitsLen)
 	for _, f := range fo.filters {
@@ -72,3 +82,105 @@ func (fo *filterOr) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
 	bm.copyFrom(bmResult)
 	putBitmap(bmResult)
 }
+
+func (fo *filterOr) matchBloomFilters(bs *blockSearch) bool {
+	byFieldTokens := fo.getByFieldTokens()
+	if len(byFieldTokens) == 0 {
+		return true
+	}
+
+	for _, fieldTokens := range byFieldTokens {
+		fieldName := fieldTokens.field
+		tokens := fieldTokens.tokens
+
+		v := bs.csh.getConstColumnValue(fieldName)
+		if v != "" {
+			if matchStringByAllTokens(v, tokens) {
+				return true
+			}
+			continue
+		}
+
+		ch := bs.csh.getColumnHeader(fieldName)
+		if ch == nil {
+			continue
+		}
+
+		if ch.valueType == valueTypeDict {
+			if matchDictValuesByAllTokens(ch.valuesDict.values, tokens) {
+				return true
+			}
+			continue
+		}
+		if matchBloomFilterAllTokens(bs, ch, tokens) {
+			return true
+		}
+	}
+
+	return false
+}
+
+func (fo *filterOr) getByFieldTokens() []fieldTokens {
+	fo.byFieldTokensOnce.Do(fo.initByFieldTokens)
+	return fo.byFieldTokens
+}
+
+func (fo *filterOr) initByFieldTokens() {
+	m := make(map[string][][]string)
+	byFieldFilters := make(map[string]int)
+	var fieldNames []string
+
+	for _, f := range fo.filters {
+		fieldName := ""
+		var tokens []string
+
+		switch t := f.(type) {
+		case *filterExact:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterExactPrefix:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterPhrase:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterPrefix:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterRegexp:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		case *filterSequence:
+			fieldName = t.fieldName
+			tokens = t.getTokens()
+		}
+
+		fieldName = getCanonicalColumnName(fieldName)
+		byFieldFilters[fieldName]++
+
+		if len(tokens) > 0 {
+			if _, ok := m[fieldName]; !ok {
+				fieldNames = append(fieldNames, fieldName)
+			}
+			m[fieldName] = append(m[fieldName], tokens)
+		}
+	}
+
+	var byFieldTokens []fieldTokens
+	for _, fieldName := range fieldNames {
+		if byFieldFilters[fieldName] < 2 {
+			// It is faster to perform bloom filter match inline when visiting the corresponding column
+			continue
+		}
+
+		commonTokens := getCommonTokens(m[fieldName])
+		if len(commonTokens) > 0 {
+			byFieldTokens = append(byFieldTokens, fieldTokens{
+				field:  fieldName,
+				tokens: commonTokens,
+			})
+		}
+	}
+
+	fo.byFieldTokens = byFieldTokens
+}
diff --git a/lib/logstorage/filter_string_range.go b/lib/logstorage/filter_string_range.go
index 4ab081b606..0951597157 100644
--- a/lib/logstorage/filter_string_range.go
+++ b/lib/logstorage/filter_string_range.go
@@ -1,11 +1,11 @@
 package logstorage
 
 import (
-	"fmt"
-
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
 )
 
+var maxStringRangeValue = string([]byte{255, 255, 255, 255})
+
 // filterStringRange matches tie given string range [minValue..maxValue)
 //
 // Note that the minValue is included in the range, while the maxValue isn't included in the range.
@@ -16,10 +16,12 @@ type filterStringRange struct {
 	fieldName string
 	minValue  string
 	maxValue  string
+
+	stringRepr string
 }
 
 func (fr *filterStringRange) String() string {
-	return fmt.Sprintf("%sstring_range(%s, %s)", quoteFieldNameIfNeeded(fr.fieldName), quoteTokenIfNeeded(fr.minValue), quoteTokenIfNeeded(fr.maxValue))
+	return quoteFieldNameIfNeeded(fr.fieldName) + fr.stringRepr
 }
 
 func (fr *filterStringRange) updateNeededFields(neededFields fieldsSet) {
diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go
index 0220d96557..87e4ecc00b 100644
--- a/lib/logstorage/parser.go
+++ b/lib/logstorage/parser.go
@@ -74,6 +74,11 @@ func (lex *lexer) isQuotedToken() bool {
 	return lex.token != lex.rawToken
 }
 
+func (lex *lexer) isNumber() bool {
+	s := lex.rawToken + lex.s
+	return isNumberPrefix(s)
+}
+
 func (lex *lexer) isPrevToken(tokens ...string) bool {
 	for _, token := range tokens {
 		if token == lex.prevToken {
@@ -247,7 +252,7 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
 		s := fmt.Sprintf("stats by (%s) count() hits", byFieldsStr)
 		lex := newLexer(s)
 
-		ps, err := parsePipeStats(lex)
+		ps, err := parsePipeStats(lex, true)
 		if err != nil {
 			logger.Panicf("BUG: unexpected error when parsing [%s]: %s", s, err)
 		}
@@ -855,6 +860,8 @@ func parseFilterStringRange(lex *lexer, fieldName string) (filter, error) {
 			fieldName: fieldName,
 			minValue:  args[0],
 			maxValue:  args[1],
+
+			stringRepr: fmt.Sprintf("string_range(%s, %s)", quoteTokenIfNeeded(args[0]), quoteTokenIfNeeded(args[1])),
 		}
 		return fr, nil
 	})
@@ -1091,6 +1098,15 @@ func parseFilterGT(lex *lexer, fieldName string) (filter, error) {
 		op = ">="
 	}
 
+	if !lex.isNumber() {
+		lexState := lex.backupState()
+		fr := tryParseFilterGTString(lex, fieldName, op, includeMinValue)
+		if fr != nil {
+			return fr, nil
+		}
+		lex.restoreState(lexState)
+	}
+
 	minValue, fStr, err := parseFloat64(lex)
 	if err != nil {
 		return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err)
@@ -1120,6 +1136,15 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) {
 		op = "<="
 	}
 
+	if !lex.isNumber() {
+		lexState := lex.backupState()
+		fr := tryParseFilterLTString(lex, fieldName, op, includeMaxValue)
+		if fr != nil {
+			return fr, nil
+		}
+		lex.restoreState(lexState)
+	}
+
 	maxValue, fStr, err := parseFloat64(lex)
 	if err != nil {
 		return nil, fmt.Errorf("cannot parse number after '%s': %w", op, err)
@@ -1138,6 +1163,43 @@ func parseFilterLT(lex *lexer, fieldName string) (filter, error) {
 	return fr, nil
 }
 
+func tryParseFilterGTString(lex *lexer, fieldName, op string, includeMinValue bool) filter {
+	minValueOrig, err := getCompoundToken(lex)
+	if err != nil {
+		return nil
+	}
+	minValue := minValueOrig
+	if !includeMinValue {
+		minValue = string(append([]byte(minValue), 0))
+	}
+	fr := &filterStringRange{
+		fieldName: fieldName,
+		minValue:  minValue,
+		maxValue:  maxStringRangeValue,
+
+		stringRepr: op + quoteStringTokenIfNeeded(minValueOrig),
+	}
+	return fr
+}
+
+func tryParseFilterLTString(lex *lexer, fieldName, op string, includeMaxValue bool) filter {
+	maxValueOrig, err := getCompoundToken(lex)
+	if err != nil {
+		return nil
+	}
+	maxValue := maxValueOrig
+	if includeMaxValue {
+		maxValue = string(append([]byte(maxValue), 0))
+	}
+	fr := &filterStringRange{
+		fieldName: fieldName,
+		maxValue:  maxValue,
+
+		stringRepr: op + quoteStringTokenIfNeeded(maxValueOrig),
+	}
+	return fr
+}
+
 func parseFilterRange(lex *lexer, fieldName string) (filter, error) {
 	funcName := lex.token
 	lex.nextToken()
@@ -1495,6 +1557,13 @@ func parseTime(lex *lexer) (int64, string, error) {
 	return int64(math.Round(t*1e3)) * 1e6, s, nil
 }
 
+func quoteStringTokenIfNeeded(s string) string {
+	if !needQuoteStringToken(s) {
+		return s
+	}
+	return strconv.Quote(s)
+}
+
 func quoteTokenIfNeeded(s string) string {
 	if !needQuoteToken(s) {
 		return s
@@ -1502,6 +1571,23 @@ func quoteTokenIfNeeded(s string) string {
 	return strconv.Quote(s)
 }
 
+func needQuoteStringToken(s string) bool {
+	return isNumberPrefix(s) || needQuoteToken(s)
+}
+
+func isNumberPrefix(s string) bool {
+	if len(s) == 0 {
+		return false
+	}
+	if s[0] == '-' || s[0] == '+' {
+		s = s[1:]
+		if len(s) == 0 {
+			return false
+		}
+	}
+	return s[0] >= '0' && s[0] <= '9'
+}
+
 func needQuoteToken(s string) bool {
 	sLower := strings.ToLower(s)
 	if _, ok := reservedKeywords[sLower]; ok {
diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go
index 4b80b66b87..01f89c5fe3 100644
--- a/lib/logstorage/parser_test.go
+++ b/lib/logstorage/parser_test.go
@@ -353,6 +353,10 @@ func TestParseFilterStringRange(t *testing.T) {
 
 	f("string_range(foo, bar)", ``, "foo", "bar")
 	f(`abc:string_range("foo,bar", "baz) !")`, `abc`, `foo,bar`, `baz) !`)
+	f(">foo", ``, "foo\x00", maxStringRangeValue)
+	f("x:>=foo", `x`, "foo", maxStringRangeValue)
+	f("x:<foo", `x`, ``, `foo`)
+	f(`<="123"`, ``, ``, "123\x00")
 }
 
 func TestParseFilterRegexp(t *testing.T) {
@@ -527,9 +531,9 @@ func TestParseRangeFilter(t *testing.T) {
 	f(`foo:>=10.43`, `foo`, 10.43, inf)
 	f(`foo: >= -10.43`, `foo`, -10.43, inf)
 
-	f(`foo:<10.43`, `foo`, -inf, nextafter(10.43, -inf))
+	f(`foo:<10.43K`, `foo`, -inf, nextafter(10_430, -inf))
 	f(`foo: < -10.43`, `foo`, -inf, nextafter(-10.43, -inf))
-	f(`foo:<=10.43`, `foo`, -inf, 10.43)
+	f(`foo:<=10.43ms`, `foo`, -inf, 10_430_000)
 	f(`foo: <= 10.43`, `foo`, -inf, 10.43)
 }
 
@@ -590,6 +594,10 @@ func TestParseQuerySuccess(t *testing.T) {
 	f(`NOT foo AND bar OR baz`, `!foo bar or baz`)
 	f(`NOT (foo AND bar) OR baz`, `!(foo bar) or baz`)
 	f(`foo OR bar AND baz`, `foo or bar baz`)
+	f(`foo bar or baz xyz`, `foo bar or baz xyz`)
+	f(`foo (bar or baz) xyz`, `foo (bar or baz) xyz`)
+	f(`foo or bar baz or xyz`, `foo or bar baz or xyz`)
+	f(`(foo or bar) (baz or xyz)`, `(foo or bar) (baz or xyz)`)
 	f(`(foo OR bar) AND baz`, `(foo or bar) baz`)
 
 	// parens
@@ -802,6 +810,12 @@ func TestParseQuerySuccess(t *testing.T) {
 	// string_range filter
 	f(`string_range(foo, bar)`, `string_range(foo, bar)`)
 	f(`foo:string_range("foo, bar", baz)`, `foo:string_range("foo, bar", baz)`)
+	f(`foo:>bar`, `foo:>bar`)
+	f(`foo:>"1234"`, `foo:>"1234"`)
+	f(`>="abc"`, `>=abc`)
+	f(`foo:<bar`, `foo:<bar`)
+	f(`foo:<"-12.34"`, `foo:<"-12.34"`)
+	f(`<="abc < de"`, `<="abc < de"`)
 
 	// reserved field names
 	f(`"_stream"`, `_stream`)
@@ -869,8 +883,10 @@ func TestParseQuerySuccess(t *testing.T) {
 	f(`* | DELETE foo, bar`, `* | delete foo, bar`)
 
 	// limit and head pipe
-	f(`foo | limit 10`, `foo | limit 10`)
-	f(`foo | head 10`, `foo | limit 10`)
+	f(`foo | limit`, `foo | limit 10`)
+	f(`foo | head`, `foo | limit 10`)
+	f(`foo | limit 20`, `foo | limit 20`)
+	f(`foo | head 20`, `foo | limit 20`)
 	f(`foo | HEAD 1_123_432`, `foo | limit 1123432`)
 	f(`foo | head 10K`, `foo | limit 10000`)
 
@@ -1065,6 +1081,10 @@ func TestParseQuerySuccess(t *testing.T) {
 	f(`foo | # some comment | foo bar
 	  fields x # another comment
 	  |filter "foo#this#isn't a comment"#this is comment`, `foo | fields x | filter "foo#this#isn't a comment"`)
+
+	// skip 'stats' and 'filter' prefixes
+	f(`* | by (host) count() rows | rows:>10`, `* | stats by (host) count(*) as rows | filter rows:>10`)
+	f(`* | (host) count() rows, count() if (error) errors | rows:>10`, `* | stats by (host) count(*) as rows, count(*) if (error) as errors | filter rows:>10`)
 }
 
 func TestParseQueryFailure(t *testing.T) {
@@ -1082,7 +1102,7 @@ func TestParseQueryFailure(t *testing.T) {
 	f("")
 	f("|")
 	f("foo|")
-	f("foo|bar")
+	f("foo|bar(")
 	f("foo and")
 	f("foo OR ")
 	f("not")
@@ -1151,7 +1171,7 @@ func TestParseQueryFailure(t *testing.T) {
 	f(`very long query with error aaa ffdfd fdfdfd fdfd:( ffdfdfdfdfd`)
 
 	// query with unexpected tail
-	f(`foo | bar`)
+	f(`foo | bar(`)
 
 	// unexpected comma
 	f(`foo,bar`)
@@ -1264,6 +1284,7 @@ func TestParseQueryFailure(t *testing.T) {
 	f(`string_range(foo, bar`)
 	f(`string_range(foo)`)
 	f(`string_range(foo, bar, baz)`)
+	f(`>(`)
 
 	// missing filter
 	f(`| fields *`)
@@ -1271,9 +1292,9 @@ func TestParseQueryFailure(t *testing.T) {
 	// missing pipe keyword
 	f(`foo |`)
 
-	// unknown pipe keyword
-	f(`foo | bar`)
-	f(`foo | fields bar | baz`)
+	// invlaid pipe
+	f(`foo | bar(`)
+	f(`foo | fields bar | baz(`)
 
 	// missing field in fields pipe
 	f(`foo | fields`)
@@ -1313,10 +1334,6 @@ func TestParseQueryFailure(t *testing.T) {
 	f(`foo | delete foo,`)
 	f(`foo | delete foo,,`)
 
-	// missing limit and head pipe value
-	f(`foo | limit`)
-	f(`foo | head`)
-
 	// invalid limit pipe value
 	f(`foo | limit bar`)
 	f(`foo | limit -123`)
diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go
index d01fdc4bea..32a72d0dc5 100644
--- a/lib/logstorage/pipe.go
+++ b/lib/logstorage/pipe.go
@@ -86,6 +86,8 @@ func parsePipes(lex *lexer) ([]pipe, error) {
 			lex.nextToken()
 		case lex.isKeyword(")", ""):
 			return pipes, nil
+		default:
+			return nil, fmt.Errorf("unexpected token after [%s]: %q; expecting '|' or ')'", pipes[len(pipes)-1], lex.token)
 		}
 	}
 }
@@ -98,7 +100,7 @@ func parsePipe(lex *lexer) (pipe, error) {
 			return nil, fmt.Errorf("cannot parse 'copy' pipe: %w", err)
 		}
 		return pc, nil
-	case lex.isKeyword("delete", "del", "rm"):
+	case lex.isKeyword("delete", "del", "rm", "drop"):
 		pd, err := parsePipeDelete(lex)
 		if err != nil {
 			return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
@@ -110,12 +112,24 @@ func parsePipe(lex *lexer) (pipe, error) {
 			return nil, fmt.Errorf("cannot parse 'extract' pipe: %w", err)
 		}
 		return pe, nil
+	case lex.isKeyword("extract_regexp"):
+		pe, err := parsePipeExtractRegexp(lex)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse 'extract_regexp' pipe: %w", err)
+		}
+		return pe, nil
 	case lex.isKeyword("field_names"):
 		pf, err := parsePipeFieldNames(lex)
 		if err != nil {
 			return nil, fmt.Errorf("cannot parse 'field_names' pipe: %w", err)
 		}
 		return pf, nil
+	case lex.isKeyword("field_values"):
+		pf, err := parsePipeFieldValues(lex)
+		if err != nil {
+			return nil, fmt.Errorf("cannot pase 'field_values' pipe: %w", err)
+		}
+		return pf, nil
 	case lex.isKeyword("fields", "keep"):
 		pf, err := parsePipeFields(lex)
 		if err != nil {
@@ -123,7 +137,7 @@ func parsePipe(lex *lexer) (pipe, error) {
 		}
 		return pf, nil
 	case lex.isKeyword("filter"):
-		pf, err := parsePipeFilter(lex)
+		pf, err := parsePipeFilter(lex, true)
 		if err != nil {
 			return nil, fmt.Errorf("cannot parse 'filter' pipe: %w", err)
 		}
@@ -140,6 +154,12 @@ func parsePipe(lex *lexer) (pipe, error) {
 			return nil, fmt.Errorf("cannot parse 'limit' pipe: %w", err)
 		}
 		return pl, nil
+	case lex.isKeyword("math"):
+		pm, err := parsePipeMath(lex)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse 'math' pipe: %w", err)
+		}
+		return pm, nil
 	case lex.isKeyword("offset", "skip"):
 		ps, err := parsePipeOffset(lex)
 		if err != nil {
@@ -177,7 +197,7 @@ func parsePipe(lex *lexer) (pipe, error) {
 		}
 		return ps, nil
 	case lex.isKeyword("stats"):
-		ps, err := parsePipeStats(lex)
+		ps, err := parsePipeStats(lex, true)
 		if err != nil {
 			return nil, fmt.Errorf("cannot parse 'stats' pipe: %w", err)
 		}
@@ -207,6 +227,22 @@ func parsePipe(lex *lexer) (pipe, error) {
 		}
 		return pu, nil
 	default:
+		lexState := lex.backupState()
+
+		// Try parsing stats pipe without 'stats' keyword
+		ps, err := parsePipeStats(lex, false)
+		if err == nil {
+			return ps, nil
+		}
+		lex.restoreState(lexState)
+
+		// Try parsing filter pipe without 'filter' keyword
+		pf, err := parsePipeFilter(lex, false)
+		if err == nil {
+			return pf, nil
+		}
+		lex.restoreState(lexState)
+
 		return nil, fmt.Errorf("unexpected pipe %q", lex.token)
 	}
 }
diff --git a/lib/logstorage/pipe_delete.go b/lib/logstorage/pipe_delete.go
index 9f2ca5a686..543b769fb7 100644
--- a/lib/logstorage/pipe_delete.go
+++ b/lib/logstorage/pipe_delete.go
@@ -70,8 +70,8 @@ func (pdp *pipeDeleteProcessor) flush() error {
 }
 
 func parsePipeDelete(lex *lexer) (*pipeDelete, error) {
-	if !lex.isKeyword("delete", "del", "rm") {
-		return nil, fmt.Errorf("expecting 'delete', 'del' or 'rm'; got %q", lex.token)
+	if !lex.isKeyword("delete", "del", "rm", "drop") {
+		return nil, fmt.Errorf("expecting 'delete', 'del', 'rm' or 'drop'; got %q", lex.token)
 	}
 
 	var fields []string
diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go
index e5c950592e..a0cfe16f42 100644
--- a/lib/logstorage/pipe_extract.go
+++ b/lib/logstorage/pipe_extract.go
@@ -64,13 +64,14 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
 		unneededFieldsOrig := unneededFields.clone()
 		needFromField := false
 		for _, step := range pe.ptn.steps {
-			if step.field != "" {
-				if !unneededFieldsOrig.contains(step.field) {
-					needFromField = true
-				}
-				if !pe.keepOriginalFields && !pe.skipEmptyResults {
-					unneededFields.add(step.field)
-				}
+			if step.field == "" {
+				continue
+			}
+			if !unneededFieldsOrig.contains(step.field) {
+				needFromField = true
+			}
+			if !pe.keepOriginalFields && !pe.skipEmptyResults {
+				unneededFields.add(step.field)
 			}
 		}
 		if needFromField {
@@ -85,7 +86,10 @@ func (pe *pipeExtract) updateNeededFields(neededFields, unneededFields fieldsSet
 		neededFieldsOrig := neededFields.clone()
 		needFromField := false
 		for _, step := range pe.ptn.steps {
-			if step.field != "" && neededFieldsOrig.contains(step.field) {
+			if step.field == "" {
+				continue
+			}
+			if neededFieldsOrig.contains(step.field) {
 				needFromField = true
 				if !pe.keepOriginalFields && !pe.skipEmptyResults {
 					neededFields.remove(step.field)
diff --git a/lib/logstorage/pipe_extract_regexp.go b/lib/logstorage/pipe_extract_regexp.go
new file mode 100644
index 0000000000..5a5c4017a4
--- /dev/null
+++ b/lib/logstorage/pipe_extract_regexp.go
@@ -0,0 +1,334 @@
+package logstorage
+
+import (
+	"fmt"
+	"regexp"
+	"unsafe"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
+)
+
+// pipeExtractRegexp processes '| extract_regexp ...' pipe.
+//
+// See https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe
+type pipeExtractRegexp struct {
+	fromField string
+
+	re       *regexp.Regexp
+	reFields []string
+
+	keepOriginalFields bool
+	skipEmptyResults   bool
+
+	// iff is an optional filter for skipping the extract func
+	iff *ifFilter
+}
+
+func (pe *pipeExtractRegexp) String() string {
+	s := "extract_regexp"
+	if pe.iff != nil {
+		s += " " + pe.iff.String()
+	}
+	reStr := pe.re.String()
+	s += " " + quoteTokenIfNeeded(reStr)
+	if !isMsgFieldName(pe.fromField) {
+		s += " from " + quoteTokenIfNeeded(pe.fromField)
+	}
+	if pe.keepOriginalFields {
+		s += " keep_original_fields"
+	}
+	if pe.skipEmptyResults {
+		s += " skip_empty_results"
+	}
+	return s
+}
+
+func (pe *pipeExtractRegexp) optimize() {
+	pe.iff.optimizeFilterIn()
+}
+
+func (pe *pipeExtractRegexp) hasFilterInWithQuery() bool {
+	return pe.iff.hasFilterInWithQuery()
+}
+
+func (pe *pipeExtractRegexp) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
+	iffNew, err := pe.iff.initFilterInValues(cache, getFieldValuesFunc)
+	if err != nil {
+		return nil, err
+	}
+	peNew := *pe
+	peNew.iff = iffNew
+	return &peNew, nil
+}
+
+func (pe *pipeExtractRegexp) updateNeededFields(neededFields, unneededFields fieldsSet) {
+	if neededFields.contains("*") {
+		unneededFieldsOrig := unneededFields.clone()
+		needFromField := false
+		for _, f := range pe.reFields {
+			if f == "" {
+				continue
+			}
+			if !unneededFieldsOrig.contains(f) {
+				needFromField = true
+			}
+			if !pe.keepOriginalFields && !pe.skipEmptyResults {
+				unneededFields.add(f)
+			}
+		}
+		if needFromField {
+			unneededFields.remove(pe.fromField)
+			if pe.iff != nil {
+				unneededFields.removeFields(pe.iff.neededFields)
+			}
+		} else {
+			unneededFields.add(pe.fromField)
+		}
+	} else {
+		neededFieldsOrig := neededFields.clone()
+		needFromField := false
+		for _, f := range pe.reFields {
+			if f == "" {
+				continue
+			}
+			if neededFieldsOrig.contains(f) {
+				needFromField = true
+				if !pe.keepOriginalFields && !pe.skipEmptyResults {
+					neededFields.remove(f)
+				}
+			}
+		}
+		if needFromField {
+			neededFields.add(pe.fromField)
+			if pe.iff != nil {
+				neededFields.addFields(pe.iff.neededFields)
+			}
+		}
+	}
+}
+
+func (pe *pipeExtractRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
+	return &pipeExtractRegexpProcessor{
+		pe:     pe,
+		ppNext: ppNext,
+
+		shards: make([]pipeExtractRegexpProcessorShard, workersCount),
+	}
+}
+
+type pipeExtractRegexpProcessor struct {
+	pe     *pipeExtractRegexp
+	ppNext pipeProcessor
+
+	shards []pipeExtractRegexpProcessorShard
+}
+
+type pipeExtractRegexpProcessorShard struct {
+	pipeExtractRegexpProcessorShardNopad
+
+	// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
+	_ [128 - unsafe.Sizeof(pipeExtractRegexpProcessorShardNopad{})%128]byte
+}
+
+func (shard *pipeExtractRegexpProcessorShard) apply(re *regexp.Regexp, v string) {
+	shard.fields = slicesutil.SetLength(shard.fields, len(shard.rcs))
+	fields := shard.fields
+	clear(fields)
+
+	locs := re.FindStringSubmatchIndex(v)
+	if locs == nil {
+		return
+	}
+
+	for i := range fields {
+		start := locs[2*i]
+		if start < 0 {
+			// mismatch
+			continue
+		}
+		end := locs[2*i+1]
+		fields[i] = v[start:end]
+	}
+}
+
+type pipeExtractRegexpProcessorShardNopad struct {
+	bm bitmap
+
+	resultColumns []*blockResultColumn
+	resultValues  []string
+
+	rcs []resultColumn
+	a   arena
+
+	fields []string
+}
+
+func (pep *pipeExtractRegexpProcessor) writeBlock(workerID uint, br *blockResult) {
+	if len(br.timestamps) == 0 {
+		return
+	}
+
+	pe := pep.pe
+	shard := &pep.shards[workerID]
+
+	bm := &shard.bm
+	bm.init(len(br.timestamps))
+	bm.setBits()
+	if iff := pe.iff; iff != nil {
+		iff.f.applyToBlockResult(br, bm)
+		if bm.isZero() {
+			pep.ppNext.writeBlock(workerID, br)
+			return
+		}
+	}
+
+	reFields := pe.reFields
+
+	shard.rcs = slicesutil.SetLength(shard.rcs, len(reFields))
+	rcs := shard.rcs
+	for i := range reFields {
+		rcs[i].name = reFields[i]
+	}
+
+	c := br.getColumnByName(pe.fromField)
+	values := c.getValues(br)
+
+	shard.resultColumns = slicesutil.SetLength(shard.resultColumns, len(rcs))
+	resultColumns := shard.resultColumns
+	for i := range resultColumns {
+		if reFields[i] != "" {
+			resultColumns[i] = br.getColumnByName(rcs[i].name)
+		}
+	}
+
+	shard.resultValues = slicesutil.SetLength(shard.resultValues, len(rcs))
+	resultValues := shard.resultValues
+
+	hadUpdates := false
+	vPrev := ""
+	for rowIdx, v := range values {
+		if bm.isSetBit(rowIdx) {
+			if !hadUpdates || vPrev != v {
+				vPrev = v
+				hadUpdates = true
+
+				shard.apply(pe.re, v)
+
+				for i, v := range shard.fields {
+					if reFields[i] == "" {
+						continue
+					}
+					if v == "" && pe.skipEmptyResults || pe.keepOriginalFields {
+						c := resultColumns[i]
+						if vOrig := c.getValueAtRow(br, rowIdx); vOrig != "" {
+							v = vOrig
+						}
+					} else {
+						v = shard.a.copyString(v)
+					}
+					resultValues[i] = v
+				}
+			}
+		} else {
+			for i, c := range resultColumns {
+				if reFields[i] != "" {
+					resultValues[i] = c.getValueAtRow(br, rowIdx)
+				}
+			}
+		}
+
+		for i, v := range resultValues {
+			if reFields[i] != "" {
+				rcs[i].addValue(v)
+			}
+		}
+	}
+
+	for i := range rcs {
+		if reFields[i] != "" {
+			br.addResultColumn(&rcs[i])
+		}
+	}
+	pep.ppNext.writeBlock(workerID, br)
+
+	for i := range rcs {
+		rcs[i].reset()
+	}
+	shard.a.reset()
+}
+
+func (pep *pipeExtractRegexpProcessor) flush() error {
+	return nil
+}
+
+func parsePipeExtractRegexp(lex *lexer) (*pipeExtractRegexp, error) {
+	if !lex.isKeyword("extract_regexp") {
+		return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "extract_regexp")
+	}
+	lex.nextToken()
+
+	// parse optional if (...)
+	var iff *ifFilter
+	if lex.isKeyword("if") {
+		f, err := parseIfFilter(lex)
+		if err != nil {
+			return nil, err
+		}
+		iff = f
+	}
+
+	// parse pattern
+	patternStr, err := getCompoundToken(lex)
+	if err != nil {
+		return nil, fmt.Errorf("cannot read 'pattern': %w", err)
+	}
+	re, err := regexp.Compile(patternStr)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse 'pattern' %q: %w", patternStr, err)
+	}
+	reFields := re.SubexpNames()
+
+	hasNamedFields := false
+	for _, f := range reFields {
+		if f != "" {
+			hasNamedFields = true
+			break
+		}
+	}
+	if !hasNamedFields {
+		return nil, fmt.Errorf("the 'pattern' %q must contain at least a single named group in the form (?P<group_name>...)", patternStr)
+	}
+
+	// parse optional 'from ...' part
+	fromField := "_msg"
+	if lex.isKeyword("from") {
+		lex.nextToken()
+		f, err := parseFieldName(lex)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
+		}
+		fromField = f
+	}
+
+	keepOriginalFields := false
+	skipEmptyResults := false
+	switch {
+	case lex.isKeyword("keep_original_fields"):
+		lex.nextToken()
+		keepOriginalFields = true
+	case lex.isKeyword("skip_empty_results"):
+		lex.nextToken()
+		skipEmptyResults = true
+	}
+
+	pe := &pipeExtractRegexp{
+		fromField:          fromField,
+		re:                 re,
+		reFields:           reFields,
+		keepOriginalFields: keepOriginalFields,
+		skipEmptyResults:   skipEmptyResults,
+		iff:                iff,
+	}
+
+	return pe, nil
+}
diff --git a/lib/logstorage/pipe_extract_regexp_test.go b/lib/logstorage/pipe_extract_regexp_test.go
new file mode 100644
index 0000000000..e4fc7ced41
--- /dev/null
+++ b/lib/logstorage/pipe_extract_regexp_test.go
@@ -0,0 +1,329 @@
+package logstorage
+
+import (
+	"testing"
+)
+
+func TestParsePipeExtractRegexpSuccess(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeSuccess(t, pipeStr)
+	}
+
+	f(`extract_regexp "foo(?P<bar>.*)"`)
+	f(`extract_regexp "foo(?P<bar>.*)" skip_empty_results`)
+	f(`extract_regexp "foo(?P<bar>.*)" keep_original_fields`)
+	f(`extract_regexp "foo(?P<bar>.*)" from x`)
+	f(`extract_regexp "foo(?P<bar>.*)" from x skip_empty_results`)
+	f(`extract_regexp "foo(?P<bar>.*)" from x keep_original_fields`)
+	f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz`)
+	f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz skip_empty_results`)
+	f(`extract_regexp if (x:y) "foo(?P<bar>.*)" from baz keep_original_fields`)
+}
+
+func TestParsePipeExtractRegexpFailure(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeFailure(t, pipeStr)
+	}
+
+	f(`extract_regexp`)
+	f(`extract_regexp keep_original_fields`)
+	f(`extract_regexp skip_empty_results`)
+	f(`extract_regexp from`)
+	f(`extract_regexp from x`)
+	f(`extract_regexp from x "y(?P<foo>.*)"`)
+	f(`extract_regexp if (x:y)`)
+	f(`extract_regexp "a(?P<b>.*)" if (x:y)`)
+	f(`extract_regexp "a"`)
+	f(`extract_regexp "(foo)"`)
+}
+
+func TestPipeExtractRegexp(t *testing.T) {
+	f := func(pipeStr string, rows, rowsExpected [][]Field) {
+		t.Helper()
+		expectPipeResults(t, pipeStr, rows, rowsExpected)
+	}
+
+	// skip empty results
+	f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)" skip_empty_results`, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=`},
+			{"aa", "foobar"},
+			{"abc", "ippl"},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=`},
+			{"aa", "foobar"},
+			{"abc", `"x y=z"`},
+		},
+	})
+
+	// no skip empty results
+	f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=`},
+			{"aa", "foobar"},
+			{"abc", "ippl"},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=`},
+			{"aa", ""},
+			{"abc", `"x y=z"`},
+		},
+	})
+
+	// keep original fields
+	f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)" keep_original_fields`, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+			{"aa", "foobar"},
+			{"abc", ""},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+			{"abc", `"x y=z"`},
+			{"aa", "foobar"},
+		},
+	})
+
+	// no keep original fields
+	f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+			{"aa", "foobar"},
+			{"abc", ""},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+			{"abc", `"x y=z"`},
+			{"aa", "b"},
+		},
+	})
+
+	// single row, extract from _msg
+	f(`extract_regexp "baz=(?P<abc>.*) a=(?P<aa>.*)"`, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar baz="x y=z" a=b`},
+			{"abc", `"x y=z"`},
+			{"aa", "b"},
+		},
+	})
+
+	// single row, extract from _msg into _msg
+	f(`extract_regexp "msg=(?P<_msg>.*)"`, [][]Field{
+		{
+			{"_msg", `msg=bar`},
+		},
+	}, [][]Field{
+		{
+			{"_msg", "bar"},
+		},
+	})
+
+	// single row, extract from non-existing field
+	f(`extract_regexp "foo=(?P<bar>.*)" from x`, [][]Field{
+		{
+			{"_msg", `foo=bar`},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `foo=bar`},
+			{"bar", ""},
+		},
+	})
+
+	// single row, pattern mismatch
+	f(`extract_regexp "foo=(?P<bar>.*)" from x`, [][]Field{
+		{
+			{"x", `foobar`},
+		},
+	}, [][]Field{
+		{
+			{"x", `foobar`},
+			{"bar", ""},
+		},
+	})
+
+	f(`extract_regexp "foo=(?P<bar>.*) baz=(?P<xx>.*)" from x`, [][]Field{
+		{
+			{"x", `a foo="a\"b\\c" cde baz=aa`},
+		},
+	}, [][]Field{
+		{
+			{"x", `a foo="a\"b\\c" cde baz=aa`},
+			{"bar", `"a\"b\\c" cde`},
+			{"xx", "aa"},
+		},
+	})
+
+	// single row, overwirte existing column
+	f(`extract_regexp "foo=(?P<bar>.*) baz=(?P<xx>.*)" from x`, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", "abc"},
+		},
+	}, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", `cc`},
+			{"xx", `aa b`},
+		},
+	})
+
+	// single row, if match
+	f(`extract_regexp if (x:baz) "foo=(?P<bar>.*) baz=(?P<xx>.*)" from "x"`, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", "abc"},
+		},
+	}, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", `cc`},
+			{"xx", `aa b`},
+		},
+	})
+
+	// single row, if mismatch
+	f(`extract_regexp if (bar:"") "foo=(?P<bar>.*) baz=(?P<xx>.*)" from 'x'`, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", "abc"},
+		},
+	}, [][]Field{
+		{
+			{"x", `a foo=cc baz=aa b`},
+			{"bar", `abc`},
+		},
+	})
+
+	// multiple rows with distinct set of labels
+	f(`extract_regexp if (!ip:keep) "ip=(?P<ip>([0-9]+[.]){3}[0-9]+) "`, [][]Field{
+		{
+			{"foo", "bar"},
+			{"_msg", "request from ip=1.2.3.4 xxx"},
+			{"f3", "y"},
+		},
+		{
+			{"foo", "aaa"},
+			{"_msg", "ip=5.4.3.1 abcd"},
+			{"ip", "keep"},
+			{"a", "b"},
+		},
+		{
+			{"foo", "aaa"},
+			{"_msg", "ip=34.32.11.94 abcd"},
+			{"ip", "ppp"},
+			{"a", "b"},
+		},
+		{
+			{"foo", "klkfs"},
+			{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
+			{"ip", "bbbsd"},
+			{"a", "klo2i"},
+		},
+	}, [][]Field{
+		{
+			{"foo", "bar"},
+			{"_msg", "request from ip=1.2.3.4 xxx"},
+			{"f3", "y"},
+			{"ip", "1.2.3.4"},
+		},
+		{
+			{"foo", "aaa"},
+			{"_msg", "ip=5.4.3.1 abcd"},
+			{"ip", "keep"},
+			{"a", "b"},
+		},
+		{
+			{"foo", "aaa"},
+			{"_msg", "ip=34.32.11.94 abcd"},
+			{"ip", "34.32.11.94"},
+			{"a", "b"},
+		},
+		{
+			{"foo", "klkfs"},
+			{"_msg", "sdfdsfds dsf fd fdsa ip=123 abcd"},
+			{"ip", ""},
+			{"a", "klo2i"},
+		},
+	})
+}
+
+func TestPipeExtractRegexpUpdateNeededFields(t *testing.T) {
+	f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
+		t.Helper()
+		expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
+	}
+
+	// all the needed fields
+	f("extract_regexp '(?P<foo>.*)' from x", "*", "", "*", "foo")
+	f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x", "*", "", "*", "")
+	f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x keep_original_fields", "*", "", "*", "")
+	f("extract_regexp if (foo:bar) '(?P<foo>.*)' from x skip_empty_results", "*", "", "*", "")
+
+	// unneeded fields do not intersect with pattern and output fields
+	f("extract_regexp '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f1,f2,foo")
+	f("extract_regexp '(?P<foo>.*)' from x keep_original_fields", "*", "f1,f2", "*", "f1,f2")
+	f("extract_regexp '(?P<foo>.*)' from x skip_empty_results", "*", "f1,f2", "*", "f1,f2")
+	f("extract_regexp if (f1:x) '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f2,foo")
+	f("extract_regexp if (f1:x) '(?P<foo>.*)' from x keep_original_fields", "*", "f1,f2", "*", "f2")
+	f("extract_regexp if (f1:x) '(?P<foo>.*)' from x skip_empty_results", "*", "f1,f2", "*", "f2")
+	f("extract_regexp if (foo:bar f1:x) '(?P<foo>.*)' from x", "*", "f1,f2", "*", "f2")
+
+	// unneeded fields intersect with pattern
+	f("extract_regexp '(?P<foo>.*)' from x", "*", "f2,x", "*", "f2,foo")
+	f("extract_regexp '(?P<foo>.*)' from x keep_original_fields", "*", "f2,x", "*", "f2")
+	f("extract_regexp '(?P<foo>.*)' from x skip_empty_results", "*", "f2,x", "*", "f2")
+	f("extract_regexp if (f1:abc) '(?P<foo>.*)' from x", "*", "f2,x", "*", "f2,foo")
+	f("extract_regexp if (f2:abc) '(?P<foo>.*)' from x", "*", "f2,x", "*", "foo")
+
+	// unneeded fields intersect with output fields
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar,f2,foo")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo", "*", "f2,foo")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo", "*", "f2,foo")
+	f("extract_regexp if (f1:abc) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar,f2,foo")
+	f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo", "*", "bar")
+	f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo", "*", "")
+	f("extract_regexp if (f2:abc foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo", "*", "")
+
+	// unneeded fields intersect with all the output fields
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
+	f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
+	f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
+	f("extract_regexp if (a:b f2:q x:y foo:w) '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "*", "f2,foo,bar", "*", "bar,f2,foo,x")
+
+	// needed fields do not intersect with pattern and output fields
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f1,f2", "", "f1,f2", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f1,f2", "", "f1,f2", "")
+	f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
+	f("extract_regexp if (f1:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f1,f2", "", "f1,f2", "")
+
+	// needed fields intersect with pattern field
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,x", "", "f2,x", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,x", "", "f2,x", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,x", "", "f2,x", "")
+	f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,x", "", "f2,x", "")
+
+	// needed fields intersect with output fields
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo", "", "f2,x", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,foo", "", "foo,f2,x", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,foo", "", "foo,f2,x", "")
+	f("extract_regexp if (a:b) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo", "", "a,f2,x", "")
+
+	// needed fields intersect with pattern and output fields
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo,x,y", "", "f2,x,y", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x keep_original_fields", "f2,foo,x,y", "", "foo,f2,x,y", "")
+	f("extract_regexp '(?P<foo>.*)x(?P<bar>.*)' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "")
+	f("extract_regexp if (a:b foo:q) '(?P<foo>.*)x(?P<bar>.*)' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
+}
diff --git a/lib/logstorage/pipe_extract_test.go b/lib/logstorage/pipe_extract_test.go
index 08c94c520c..7404205133 100644
--- a/lib/logstorage/pipe_extract_test.go
+++ b/lib/logstorage/pipe_extract_test.go
@@ -353,31 +353,3 @@ func TestPipeExtractUpdateNeededFields(t *testing.T) {
 	f("extract '<foo>x<bar>' from x skip_empty_results", "f2,foo,x,y", "", "foo,f2,x,y", "")
 	f("extract if (a:b foo:q) '<foo>x<bar>' from x", "f2,foo,x,y", "", "a,f2,foo,x,y", "")
 }
-
-func expectParsePipeFailure(t *testing.T, pipeStr string) {
-	t.Helper()
-
-	lex := newLexer(pipeStr)
-	p, err := parsePipe(lex)
-	if err == nil && lex.isEnd() {
-		t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p)
-	}
-}
-
-func expectParsePipeSuccess(t *testing.T, pipeStr string) {
-	t.Helper()
-
-	lex := newLexer(pipeStr)
-	p, err := parsePipe(lex)
-	if err != nil {
-		t.Fatalf("cannot parse [%s]: %s", pipeStr, err)
-	}
-	if !lex.isEnd() {
-		t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s)
-	}
-
-	pipeStrResult := p.String()
-	if pipeStrResult != pipeStr {
-		t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr)
-	}
-}
diff --git a/lib/logstorage/pipe_field_values.go b/lib/logstorage/pipe_field_values.go
new file mode 100644
index 0000000000..d9c1f57acb
--- /dev/null
+++ b/lib/logstorage/pipe_field_values.go
@@ -0,0 +1,93 @@
+package logstorage
+
+import (
+	"fmt"
+)
+
+// pipeFieldValues processes '| field_values ...' queries.
+//
+// See https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe
+type pipeFieldValues struct {
+	field string
+
+	limit uint64
+}
+
+func (pf *pipeFieldValues) String() string {
+	s := "field_values " + quoteTokenIfNeeded(pf.field)
+	if pf.limit > 0 {
+		s += fmt.Sprintf(" limit %d", pf.limit)
+	}
+	return s
+}
+
+func (pf *pipeFieldValues) updateNeededFields(neededFields, unneededFields fieldsSet) {
+	if neededFields.contains("*") {
+		neededFields.reset()
+		if !unneededFields.contains(pf.field) {
+			neededFields.add(pf.field)
+		}
+		unneededFields.reset()
+	} else {
+		neededFieldsOrig := neededFields.clone()
+		neededFields.reset()
+		if neededFieldsOrig.contains(pf.field) {
+			neededFields.add(pf.field)
+		}
+	}
+}
+
+func (pf *pipeFieldValues) optimize() {
+	// nothing to do
+}
+
+func (pf *pipeFieldValues) hasFilterInWithQuery() bool {
+	return false
+}
+
+func (pf *pipeFieldValues) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
+	return pf, nil
+}
+
+func (pf *pipeFieldValues) newPipeProcessor(workersCount int, stopCh <-chan struct{}, cancel func(), ppNext pipeProcessor) pipeProcessor {
+	hitsFieldName := "hits"
+	if hitsFieldName == pf.field {
+		hitsFieldName = "hitss"
+	}
+	pu := &pipeUniq{
+		byFields:      []string{pf.field},
+		hitsFieldName: hitsFieldName,
+		limit:         pf.limit,
+	}
+	return pu.newPipeProcessor(workersCount, stopCh, cancel, ppNext)
+}
+
+func parsePipeFieldValues(lex *lexer) (*pipeFieldValues, error) {
+	if !lex.isKeyword("field_values") {
+		return nil, fmt.Errorf("expecting 'field_values'; got %q", lex.token)
+	}
+	lex.nextToken()
+
+	field, err := parseFieldName(lex)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse field name for 'field_values': %w", err)
+	}
+
+	limit := uint64(0)
+	if lex.isKeyword("limit") {
+		lex.nextToken()
+		n, ok := tryParseUint64(lex.token)
+		if !ok {
+			return nil, fmt.Errorf("cannot parse 'limit %s'", lex.token)
+		}
+		lex.nextToken()
+		limit = n
+	}
+
+	pf := &pipeFieldValues{
+		field: field,
+		limit: limit,
+	}
+
+	return pf, nil
+}
diff --git a/lib/logstorage/pipe_field_values_test.go b/lib/logstorage/pipe_field_values_test.go
new file mode 100644
index 0000000000..26b0419b60
--- /dev/null
+++ b/lib/logstorage/pipe_field_values_test.go
@@ -0,0 +1,148 @@
+package logstorage
+
+import (
+	"testing"
+)
+
+func TestParsePipeFieldValuesSuccess(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeSuccess(t, pipeStr)
+	}
+
+	f(`field_values x`)
+	f(`field_values x limit 10`)
+}
+
+func TestParsePipeFieldValuesFailure(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeFailure(t, pipeStr)
+	}
+
+	f(`field_values`)
+	f(`field_values a b`)
+	f(`field_values a limit`)
+	f(`field_values limit N`)
+}
+
+func TestPipeFieldValues(t *testing.T) {
+	f := func(pipeStr string, rows, rowsExpected [][]Field) {
+		t.Helper()
+		expectPipeResults(t, pipeStr, rows, rowsExpected)
+	}
+
+	f("field_values a", [][]Field{
+		{
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"a", "2"},
+			{"b", "3"},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+			{"c", "d"},
+		},
+	}, [][]Field{
+		{
+			{"a", "2"},
+			{"hits", "3"},
+		},
+	})
+
+	f("field_values b", [][]Field{
+		{
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"a", "2"},
+			{"b", "3"},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+			{"c", "d"},
+		},
+	}, [][]Field{
+		{
+			{"b", "3"},
+			{"hits", "2"},
+		},
+		{
+			{"b", "54"},
+			{"hits", "1"},
+		},
+	})
+
+	f("field_values c", [][]Field{
+		{
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"a", "2"},
+			{"b", "3"},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+			{"c", "d"},
+		},
+	}, [][]Field{
+		{
+			{"c", ""},
+			{"hits", "2"},
+		},
+		{
+			{"c", "d"},
+			{"hits", "1"},
+		},
+	})
+
+	f("field_values d", [][]Field{
+		{
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"a", "2"},
+			{"b", "3"},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+			{"c", "d"},
+		},
+	}, [][]Field{
+		{
+			{"d", ""},
+			{"hits", "3"},
+		},
+	})
+}
+
+func TestPipeFieldValuesUpdateNeededFields(t *testing.T) {
+	f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
+		t.Helper()
+		expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
+	}
+
+	// all the needed fields
+	f("field_values x", "*", "", "x", "")
+
+	// all the needed fields, unneeded fields do not intersect with src
+	f("field_values x", "*", "f1,f2", "x", "")
+
+	// all the needed fields, unneeded fields intersect with src
+	f("field_values x", "*", "f1,x", "", "")
+
+	// needed fields do not intersect with src
+	f("field_values x", "f1,f2", "", "", "")
+
+	// needed fields intersect with src
+	f("field_values x", "f1,x", "", "x", "")
+}
diff --git a/lib/logstorage/pipe_filter.go b/lib/logstorage/pipe_filter.go
index daba8f5bc3..c1f418f60f 100644
--- a/lib/logstorage/pipe_filter.go
+++ b/lib/logstorage/pipe_filter.go
@@ -108,11 +108,13 @@ func (pfp *pipeFilterProcessor) flush() error {
 	return nil
 }
 
-func parsePipeFilter(lex *lexer) (*pipeFilter, error) {
-	if !lex.isKeyword("filter") {
-		return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
+func parsePipeFilter(lex *lexer, needFilterKeyword bool) (*pipeFilter, error) {
+	if needFilterKeyword {
+		if !lex.isKeyword("filter") {
+			return nil, fmt.Errorf("expecting 'filter'; got %q", lex.token)
+		}
+		lex.nextToken()
 	}
-	lex.nextToken()
 
 	f, err := parseFilter(lex)
 	if err != nil {
diff --git a/lib/logstorage/pipe_filter_test.go b/lib/logstorage/pipe_filter_test.go
index 0c31830196..dc244ffb05 100644
--- a/lib/logstorage/pipe_filter_test.go
+++ b/lib/logstorage/pipe_filter_test.go
@@ -32,6 +32,14 @@ func TestPipeFilter(t *testing.T) {
 		expectPipeResults(t, pipeStr, rows, rowsExpected)
 	}
 
+	// filter mismatch, missing 'filter' prefix
+	f("abc", [][]Field{
+		{
+			{"_msg", `{"foo":"bar"}`},
+			{"a", `test`},
+		},
+	}, [][]Field{})
+
 	// filter mismatch
 	f("filter abc", [][]Field{
 		{
@@ -40,6 +48,19 @@ func TestPipeFilter(t *testing.T) {
 		},
 	}, [][]Field{})
 
+	// filter match, missing 'filter' prefix
+	f("foo", [][]Field{
+		{
+			{"_msg", `{"foo":"bar"}`},
+			{"a", `test`},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `{"foo":"bar"}`},
+			{"a", `test`},
+		},
+	})
+
 	// filter match
 	f("filter foo", [][]Field{
 		{
diff --git a/lib/logstorage/pipe_format.go b/lib/logstorage/pipe_format.go
index de580110ec..7146f99ed6 100644
--- a/lib/logstorage/pipe_format.go
+++ b/lib/logstorage/pipe_format.go
@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"strconv"
 	"unsafe"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
 )
 
 // pipeFormat processes '| format ...' pipe.
@@ -169,8 +171,8 @@ func (pfp *pipeFormatProcessor) flush() error {
 }
 
 func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult, rowIdx int) string {
-	bb := bbPool.Get()
-	b := bb.B
+	b := shard.a.b
+	bLen := len(b)
 	for _, step := range pf.steps {
 		b = append(b, step.prefix...)
 		if step.field != "" {
@@ -183,11 +185,9 @@ func (shard *pipeFormatProcessorShard) formatRow(pf *pipeFormat, br *blockResult
 			}
 		}
 	}
-	bb.B = b
+	shard.a.b = b
 
-	v := shard.a.copyBytesToString(b)
-	bbPool.Put(bb)
-	return v
+	return bytesutil.ToUnsafeString(b[bLen:])
 }
 
 func parsePipeFormat(lex *lexer) (*pipeFormat, error) {
diff --git a/lib/logstorage/pipe_limit.go b/lib/logstorage/pipe_limit.go
index f480cb46c8..41f98fded5 100644
--- a/lib/logstorage/pipe_limit.go
+++ b/lib/logstorage/pipe_limit.go
@@ -88,15 +88,20 @@ func parsePipeLimit(lex *lexer) (*pipeLimit, error) {
 	if !lex.isKeyword("limit", "head") {
 		return nil, fmt.Errorf("expecting 'limit' or 'head'; got %q", lex.token)
 	}
+	lex.nextToken()
 
-	lex.nextToken()
-	n, err := parseUint(lex.token)
-	if err != nil {
-		return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err)
+	limit := uint64(10)
+	if !lex.isKeyword("|", ")", "") {
+		n, err := parseUint(lex.token)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse rows limit from %q: %w", lex.token, err)
+		}
+		lex.nextToken()
+		limit = n
 	}
-	lex.nextToken()
+
 	pl := &pipeLimit{
-		limit: n,
+		limit: limit,
 	}
 	return pl, nil
 }
diff --git a/lib/logstorage/pipe_limit_test.go b/lib/logstorage/pipe_limit_test.go
index bc7afa4fdc..23b176ee16 100644
--- a/lib/logstorage/pipe_limit_test.go
+++ b/lib/logstorage/pipe_limit_test.go
@@ -20,7 +20,6 @@ func TestParsePipeLimitFailure(t *testing.T) {
 		expectParsePipeFailure(t, pipeStr)
 	}
 
-	f(`limit`)
 	f(`limit -10`)
 	f(`limit foo`)
 }
@@ -30,6 +29,17 @@ func TestPipeLimit(t *testing.T) {
 		t.Helper()
 		expectPipeResults(t, pipeStr, rows, rowsExpected)
 	}
+	f("limit", [][]Field{
+		{
+			{"_msg", `{"foo":"bar"}`},
+			{"a", `test`},
+		},
+	}, [][]Field{
+		{
+			{"_msg", `{"foo":"bar"}`},
+			{"a", `test`},
+		},
+	})
 
 	f("limit 100", [][]Field{
 		{
diff --git a/lib/logstorage/pipe_math.go b/lib/logstorage/pipe_math.go
new file mode 100644
index 0000000000..47f7ed73a6
--- /dev/null
+++ b/lib/logstorage/pipe_math.go
@@ -0,0 +1,776 @@
+package logstorage
+
+import (
+	"fmt"
+	"math"
+	"strings"
+	"unsafe"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
+)
+
+// pipeMath processes '| math ...' pipe.
+//
+// See https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe
+type pipeMath struct {
+	entries []*mathEntry
+}
+
+type mathEntry struct {
+	// The calculated expr result is stored in resultField.
+	resultField string
+
+	// expr is the expression to calculate.
+	expr *mathExpr
+}
+
+type mathExpr struct {
+	// if isConst is set, then the given mathExpr returns the given constValue.
+	isConst    bool
+	constValue float64
+
+	// constValueStr is the original string representation of constValue.
+	//
+	// It is used in String() method for returning the original representation of the given constValue.
+	constValueStr string
+
+	// if fieldName isn't empty, then the given mathExpr fetches numeric values from the given fieldName.
+	fieldName string
+
+	// args are args for the given mathExpr.
+	args []*mathExpr
+
+	// op is the operation name (aka function name) for the given mathExpr.
+	op string
+
+	// f is the function for calculating results for the given mathExpr.
+	f mathFunc
+
+	// whether the mathExpr was wrapped in parens.
+	wrappedInParens bool
+}
+
+// mathFunc must fill result with calculated results based on the given args.
+type mathFunc func(result []float64, args [][]float64)
+
+func (pm *pipeMath) String() string {
+	s := "math"
+	a := make([]string, len(pm.entries))
+	for i, e := range pm.entries {
+		a[i] = e.String()
+	}
+	s += " " + strings.Join(a, ", ")
+	return s
+}
+
+func (me *mathEntry) String() string {
+	s := me.expr.String()
+	if isMathBinaryOp(me.expr.op) {
+		s = "(" + s + ")"
+	}
+	s += " as " + quoteTokenIfNeeded(me.resultField)
+	return s
+}
+
+func (me *mathExpr) String() string {
+	if me.isConst {
+		return me.constValueStr
+	}
+	if me.fieldName != "" {
+		return quoteTokenIfNeeded(me.fieldName)
+	}
+
+	args := me.args
+	if isMathBinaryOp(me.op) {
+		opPriority := getMathBinaryOpPriority(me.op)
+		left := me.args[0]
+		right := me.args[1]
+		leftStr := left.String()
+		rightStr := right.String()
+		if isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > opPriority {
+			leftStr = "(" + leftStr + ")"
+		}
+		if isMathBinaryOp(right.op) && getMathBinaryOpPriority(right.op) > opPriority {
+			rightStr = "(" + rightStr + ")"
+		}
+		return fmt.Sprintf("%s %s %s", leftStr, me.op, rightStr)
+	}
+
+	if me.op == "unary_minus" {
+		argStr := args[0].String()
+		if isMathBinaryOp(args[0].op) {
+			argStr = "(" + argStr + ")"
+		}
+		return "-" + argStr
+	}
+
+	a := make([]string, len(args))
+	for i, arg := range args {
+		a[i] = arg.String()
+	}
+	argsStr := strings.Join(a, ", ")
+	return fmt.Sprintf("%s(%s)", me.op, argsStr)
+}
+
+func isMathBinaryOp(op string) bool {
+	_, ok := mathBinaryOps[op]
+	return ok
+}
+
+func getMathBinaryOpPriority(op string) int {
+	bo, ok := mathBinaryOps[op]
+	if !ok {
+		logger.Panicf("BUG: unexpected binary op: %q", op)
+	}
+	return bo.priority
+}
+
+func getMathFuncForBinaryOp(op string) (mathFunc, error) {
+	bo, ok := mathBinaryOps[op]
+	if !ok {
+		return nil, fmt.Errorf("unsupported binary operation: %q", op)
+	}
+	return bo.f, nil
+}
+
+var mathBinaryOps = map[string]mathBinaryOp{
+	"^": {
+		priority: 1,
+		f:        mathFuncPow,
+	},
+	"*": {
+		priority: 2,
+		f:        mathFuncMul,
+	},
+	"/": {
+		priority: 2,
+		f:        mathFuncDiv,
+	},
+	"%": {
+		priority: 2,
+		f:        mathFuncMod,
+	},
+	"+": {
+		priority: 3,
+		f:        mathFuncPlus,
+	},
+	"-": {
+		priority: 3,
+		f:        mathFuncMinus,
+	},
+}
+
+type mathBinaryOp struct {
+	priority int
+	f        mathFunc
+}
+
+func (pm *pipeMath) updateNeededFields(neededFields, unneededFields fieldsSet) {
+	for i := len(pm.entries) - 1; i >= 0; i-- {
+		e := pm.entries[i]
+		if neededFields.contains("*") {
+			if !unneededFields.contains(e.resultField) {
+				unneededFields.add(e.resultField)
+
+				entryFields := e.getNeededFields()
+				unneededFields.removeFields(entryFields)
+			}
+		} else {
+			if neededFields.contains(e.resultField) {
+				neededFields.remove(e.resultField)
+
+				entryFields := e.getNeededFields()
+				neededFields.addFields(entryFields)
+			}
+		}
+	}
+}
+
+func (me *mathEntry) getNeededFields() []string {
+	neededFields := newFieldsSet()
+	me.expr.updateNeededFields(neededFields)
+	return neededFields.getAll()
+}
+
+func (me *mathExpr) updateNeededFields(neededFields fieldsSet) {
+	if me.isConst {
+		return
+	}
+	if me.fieldName != "" {
+		neededFields.add(me.fieldName)
+		return
+	}
+	for _, arg := range me.args {
+		arg.updateNeededFields(neededFields)
+	}
+}
+
+func (pm *pipeMath) optimize() {
+	// nothing to do
+}
+
+func (pm *pipeMath) hasFilterInWithQuery() bool {
+	return false
+}
+
+func (pm *pipeMath) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
+	return pm, nil
+}
+
+func (pm *pipeMath) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
+	pmp := &pipeMathProcessor{
+		pm:     pm,
+		ppNext: ppNext,
+
+		shards: make([]pipeMathProcessorShard, workersCount),
+	}
+	return pmp
+}
+
+type pipeMathProcessor struct {
+	pm     *pipeMath
+	ppNext pipeProcessor
+
+	shards []pipeMathProcessorShard
+}
+
+type pipeMathProcessorShard struct {
+	pipeMathProcessorShardNopad
+
+	// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
+	_ [128 - unsafe.Sizeof(pipeMathProcessorShardNopad{})%128]byte
+}
+
+type pipeMathProcessorShardNopad struct {
+	// a holds all the data for rcs.
+	a arena
+
+	// rcs is used for storing calculated results before they are written to ppNext.
+	rcs []resultColumn
+
+	// rs is storage for temporary results
+	rs [][]float64
+
+	// rsBuf is backing storage for rs slices
+	rsBuf []float64
+}
+
+func (shard *pipeMathProcessorShard) executeMathEntry(e *mathEntry, rc *resultColumn, br *blockResult) {
+	clear(shard.rs)
+	shard.rs = shard.rs[:0]
+	shard.rsBuf = shard.rsBuf[:0]
+
+	shard.executeExpr(e.expr, br)
+	r := shard.rs[0]
+
+	b := shard.a.b
+	for _, f := range r {
+		bLen := len(b)
+		b = marshalFloat64String(b, f)
+		v := bytesutil.ToUnsafeString(b[bLen:])
+		rc.addValue(v)
+	}
+	shard.a.b = b
+}
+
+func (shard *pipeMathProcessorShard) executeExpr(me *mathExpr, br *blockResult) {
+	rIdx := len(shard.rs)
+	shard.rs = slicesutil.SetLength(shard.rs, len(shard.rs)+1)
+
+	shard.rsBuf = slicesutil.SetLength(shard.rsBuf, len(shard.rsBuf)+len(br.timestamps))
+	shard.rs[rIdx] = shard.rsBuf[len(shard.rsBuf)-len(br.timestamps):]
+
+	if me.isConst {
+		r := shard.rs[rIdx]
+		for i := range br.timestamps {
+			r[i] = me.constValue
+		}
+		return
+	}
+	if me.fieldName != "" {
+		c := br.getColumnByName(me.fieldName)
+		values := c.getValues(br)
+		r := shard.rs[rIdx]
+		var f float64
+		for i, v := range values {
+			if i == 0 || v != values[i-1] {
+				var ok bool
+				f, ok = tryParseFloat64(v)
+				if !ok {
+					f = nan
+				}
+			}
+			r[i] = f
+		}
+		return
+	}
+
+	rsBufLen := len(shard.rsBuf)
+	for _, arg := range me.args {
+		shard.executeExpr(arg, br)
+	}
+
+	result := shard.rs[rIdx]
+	args := shard.rs[rIdx+1:]
+	me.f(result, args)
+
+	shard.rs = shard.rs[:rIdx+1]
+	shard.rsBuf = shard.rsBuf[:rsBufLen]
+}
+
+func (pmp *pipeMathProcessor) writeBlock(workerID uint, br *blockResult) {
+	if len(br.timestamps) == 0 {
+		return
+	}
+
+	shard := &pmp.shards[workerID]
+	entries := pmp.pm.entries
+
+	shard.rcs = slicesutil.SetLength(shard.rcs, len(entries))
+	rcs := shard.rcs
+	for i, e := range entries {
+		rc := &rcs[i]
+		rc.name = e.resultField
+		shard.executeMathEntry(e, rc, br)
+		br.addResultColumn(rc)
+	}
+
+	pmp.ppNext.writeBlock(workerID, br)
+
+	for i := range rcs {
+		rcs[i].resetValues()
+	}
+	shard.a.reset()
+}
+
+func (pmp *pipeMathProcessor) flush() error {
+	return nil
+}
+
+func parsePipeMath(lex *lexer) (*pipeMath, error) {
+	if !lex.isKeyword("math") {
+		return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "math")
+	}
+	lex.nextToken()
+
+	var mes []*mathEntry
+	for {
+		me, err := parseMathEntry(lex)
+		if err != nil {
+			return nil, err
+		}
+		mes = append(mes, me)
+
+		switch {
+		case lex.isKeyword(","):
+			lex.nextToken()
+		case lex.isKeyword("|", ")", ""):
+			if len(mes) == 0 {
+				return nil, fmt.Errorf("missing 'math' expressions")
+			}
+			pm := &pipeMath{
+				entries: mes,
+			}
+			return pm, nil
+		default:
+			return nil, fmt.Errorf("unexpected token after 'math' expression [%s]: %q; expecting ',', '|' or ')'", mes[len(mes)-1], lex.token)
+		}
+	}
+}
+
+func parseMathEntry(lex *lexer) (*mathEntry, error) {
+	me, err := parseMathExpr(lex)
+	if err != nil {
+		return nil, err
+	}
+
+	// skip optional 'as'
+	if lex.isKeyword("as") {
+		lex.nextToken()
+	}
+
+	resultField, err := parseFieldName(lex)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse result name for [%s]: %w", me, err)
+	}
+
+	e := &mathEntry{
+		resultField: resultField,
+		expr:        me,
+	}
+	return e, nil
+}
+
+func parseMathExpr(lex *lexer) (*mathExpr, error) {
+	// parse left operand
+	left, err := parseMathExprOperand(lex)
+	if err != nil {
+		return nil, err
+	}
+
+	for {
+		if !isMathBinaryOp(lex.token) {
+			// There is no right operand
+			return left, nil
+		}
+
+		// parse operator
+		op := lex.token
+		lex.nextToken()
+
+		f, err := getMathFuncForBinaryOp(op)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse operator after [%s]: %w", left, err)
+		}
+
+		// parse right operand
+		right, err := parseMathExprOperand(lex)
+		if err != nil {
+			return nil, fmt.Errorf("cannot parse operand after [%s %s]: %w", left, op, err)
+		}
+
+		me := &mathExpr{
+			args: []*mathExpr{left, right},
+			op:   op,
+			f:    f,
+		}
+
+		// balance operands according to their priority
+		if !left.wrappedInParens && isMathBinaryOp(left.op) && getMathBinaryOpPriority(left.op) > getMathBinaryOpPriority(op) {
+			me.args[0] = left.args[1]
+			left.args[1] = me
+			me = left
+		}
+
+		left = me
+	}
+}
+
+func parseMathExprInParens(lex *lexer) (*mathExpr, error) {
+	if !lex.isKeyword("(") {
+		return nil, fmt.Errorf("missing '('")
+	}
+	lex.nextToken()
+
+	me, err := parseMathExpr(lex)
+	if err != nil {
+		return nil, err
+	}
+	me.wrappedInParens = true
+
+	if !lex.isKeyword(")") {
+		return nil, fmt.Errorf("missing ')'; got %q instead", lex.token)
+	}
+	lex.nextToken()
+	return me, nil
+}
+
+func parseMathExprOperand(lex *lexer) (*mathExpr, error) {
+	if lex.isKeyword("(") {
+		return parseMathExprInParens(lex)
+	}
+
+	switch {
+	case lex.isKeyword("abs"):
+		return parseMathExprAbs(lex)
+	case lex.isKeyword("max"):
+		return parseMathExprMax(lex)
+	case lex.isKeyword("min"):
+		return parseMathExprMin(lex)
+	case lex.isKeyword("round"):
+		return parseMathExprRound(lex)
+	case lex.isKeyword("-"):
+		return parseMathExprUnaryMinus(lex)
+	case lex.isKeyword("+"):
+		// just skip unary plus
+		lex.nextToken()
+		return parseMathExprOperand(lex)
+	case lex.isNumber():
+		return parseMathExprConstNumber(lex)
+	default:
+		return parseMathExprFieldName(lex)
+	}
+}
+
+func parseMathExprAbs(lex *lexer) (*mathExpr, error) {
+	me, err := parseMathExprGenericFunc(lex, "abs", mathFuncAbs)
+	if err != nil {
+		return nil, err
+	}
+	if len(me.args) != 1 {
+		return nil, fmt.Errorf("'abs' function accepts only one arg; got %d args: [%s]", len(me.args), me)
+	}
+	return me, nil
+}
+
+func parseMathExprMax(lex *lexer) (*mathExpr, error) {
+	me, err := parseMathExprGenericFunc(lex, "max", mathFuncMax)
+	if err != nil {
+		return nil, err
+	}
+	if len(me.args) < 2 {
+		return nil, fmt.Errorf("'max' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
+	}
+	return me, nil
+}
+
+func parseMathExprMin(lex *lexer) (*mathExpr, error) {
+	me, err := parseMathExprGenericFunc(lex, "min", mathFuncMin)
+	if err != nil {
+		return nil, err
+	}
+	if len(me.args) < 2 {
+		return nil, fmt.Errorf("'min' function needs at least 2 args; got %d args: [%s]", len(me.args), me)
+	}
+	return me, nil
+}
+
+func parseMathExprRound(lex *lexer) (*mathExpr, error) {
+	me, err := parseMathExprGenericFunc(lex, "round", mathFuncRound)
+	if err != nil {
+		return nil, err
+	}
+	if len(me.args) != 1 && len(me.args) != 2 {
+		return nil, fmt.Errorf("'round' function needs 1 or 2 args; got %d args: [%s]", len(me.args), me)
+	}
+	return me, nil
+}
+
+func parseMathExprGenericFunc(lex *lexer, funcName string, f mathFunc) (*mathExpr, error) {
+	if !lex.isKeyword(funcName) {
+		return nil, fmt.Errorf("missing %q keyword", funcName)
+	}
+	lex.nextToken()
+
+	args, err := parseMathFuncArgs(lex)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse args for %q function: %w", funcName, err)
+	}
+	if len(args) == 0 {
+		return nil, fmt.Errorf("%q function needs at least one org", funcName)
+	}
+	me := &mathExpr{
+		args: args,
+		op:   funcName,
+		f:    f,
+	}
+	return me, nil
+}
+
+func parseMathFuncArgs(lex *lexer) ([]*mathExpr, error) {
+	if !lex.isKeyword("(") {
+		return nil, fmt.Errorf("missing '('")
+	}
+	lex.nextToken()
+
+	var args []*mathExpr
+	for {
+		if lex.isKeyword(")") {
+			lex.nextToken()
+			return args, nil
+		}
+
+		me, err := parseMathExpr(lex)
+		if err != nil {
+			return nil, err
+		}
+		args = append(args, me)
+
+		switch {
+		case lex.isKeyword(")"):
+		case lex.isKeyword(","):
+			lex.nextToken()
+		default:
+			return nil, fmt.Errorf("unexpected token after [%s]: %q; want ',' or ')'", me, lex.token)
+		}
+	}
+}
+
+func parseMathExprUnaryMinus(lex *lexer) (*mathExpr, error) {
+	if !lex.isKeyword("-") {
+		return nil, fmt.Errorf("missing '-'")
+	}
+	lex.nextToken()
+
+	expr, err := parseMathExprOperand(lex)
+	if err != nil {
+		return nil, err
+	}
+	me := &mathExpr{
+		args: []*mathExpr{expr},
+		op:   "unary_minus",
+		f:    mathFuncUnaryMinus,
+	}
+	return me, nil
+}
+
+func parseMathExprConstNumber(lex *lexer) (*mathExpr, error) {
+	if !lex.isNumber() {
+		return nil, fmt.Errorf("cannot parse number from %q", lex.token)
+	}
+	numStr, err := getCompoundMathToken(lex)
+	if err != nil {
+		return nil, fmt.Errorf("cannot parse number: %w", err)
+	}
+	f, ok := tryParseNumber(numStr)
+	if !ok {
+		return nil, fmt.Errorf("cannot parse number from %q", numStr)
+	}
+	me := &mathExpr{
+		isConst:       true,
+		constValue:    f,
+		constValueStr: numStr,
+	}
+	return me, nil
+}
+
+func parseMathExprFieldName(lex *lexer) (*mathExpr, error) {
+	fieldName, err := getCompoundMathToken(lex)
+	if err != nil {
+		return nil, err
+	}
+	fieldName = getCanonicalColumnName(fieldName)
+	me := &mathExpr{
+		fieldName: fieldName,
+	}
+	return me, nil
+}
+
+func getCompoundMathToken(lex *lexer) (string, error) {
+	stopTokens := []string{"=", "+", "-", "*", "/", "%", "^", ",", ")", "|", ""}
+	if lex.isKeyword(stopTokens...) {
+		return "", fmt.Errorf("compound token cannot start with '%s'", lex.token)
+	}
+
+	s := lex.token
+	rawS := lex.rawToken
+	lex.nextToken()
+	suffix := ""
+	for !lex.isSkippedSpace && !lex.isKeyword(stopTokens...) {
+		s += lex.token
+		lex.nextToken()
+	}
+	if suffix == "" {
+		return s, nil
+	}
+	return rawS + suffix, nil
+}
+
+func mathFuncPlus(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = a[i] + b[i]
+	}
+}
+
+func mathFuncMinus(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = a[i] - b[i]
+	}
+}
+
+func mathFuncMul(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = a[i] * b[i]
+	}
+}
+
+func mathFuncDiv(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = a[i] / b[i]
+	}
+}
+
+func mathFuncMod(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = math.Mod(a[i], b[i])
+	}
+}
+
+func mathFuncPow(result []float64, args [][]float64) {
+	a := args[0]
+	b := args[1]
+	for i := range result {
+		result[i] = math.Pow(a[i], b[i])
+	}
+}
+
+func mathFuncAbs(result []float64, args [][]float64) {
+	arg := args[0]
+	for i := range result {
+		result[i] = math.Abs(arg[i])
+	}
+}
+
+func mathFuncUnaryMinus(result []float64, args [][]float64) {
+	arg := args[0]
+	for i := range result {
+		result[i] = -arg[i]
+	}
+}
+
+func mathFuncMax(result []float64, args [][]float64) {
+	for i := range result {
+		f := nan
+		for _, arg := range args {
+			if math.IsNaN(f) || arg[i] > f {
+				f = arg[i]
+			}
+		}
+		result[i] = f
+	}
+}
+
+func mathFuncMin(result []float64, args [][]float64) {
+	for i := range result {
+		f := nan
+		for _, arg := range args {
+			if math.IsNaN(f) || arg[i] < f {
+				f = arg[i]
+			}
+		}
+		result[i] = f
+	}
+}
+
+func mathFuncRound(result []float64, args [][]float64) {
+	arg := args[0]
+	if len(args) == 1 {
+		// Round to integer
+		for i := range result {
+			result[i] = math.Round(arg[i])
+		}
+		return
+	}
+
+	// Round to nearest
+	nearest := args[1]
+	var f float64
+	for i := range result {
+		if i == 0 || arg[i-1] != arg[i] || nearest[i-1] != nearest[i] {
+			f = round(arg[i], nearest[i])
+		}
+		result[i] = f
+	}
+}
+
+func round(f, nearest float64) float64 {
+	_, e := decimal.FromFloat(nearest)
+	p10 := math.Pow10(int(-e))
+	f += 0.5 * math.Copysign(nearest, f)
+	f -= math.Mod(f, nearest)
+	f, _ = math.Modf(f * p10)
+	return f / p10
+}
diff --git a/lib/logstorage/pipe_math_test.go b/lib/logstorage/pipe_math_test.go
new file mode 100644
index 0000000000..66a9da24f8
--- /dev/null
+++ b/lib/logstorage/pipe_math_test.go
@@ -0,0 +1,233 @@
+package logstorage
+
+import (
+	"testing"
+)
+
+func TestParsePipeMathSuccess(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeSuccess(t, pipeStr)
+	}
+
+	f(`math b as a`)
+	f(`math -123 as a`)
+	f(`math 12.345KB as a`)
+	f(`math (-2 + 2) as a`)
+	f(`math x as a, z as y`)
+	f(`math (foo / bar + baz * abc % -45ms) as a`)
+	f(`math (foo / (bar + baz) * abc ^ 2) as a`)
+	f(`math (foo / ((bar + baz) * abc) ^ -2) as a`)
+	f(`math (foo + bar / baz - abc) as a`)
+	f(`math min(3, foo, (1 + bar) / baz) as a, max(a, b) as b, (abs(c) + 5) as d`)
+	f(`math round(foo) as x`)
+	f(`math round(foo, 0.1) as y`)
+}
+
+func TestParsePipeMathFailure(t *testing.T) {
+	f := func(pipeStr string) {
+		t.Helper()
+		expectParsePipeFailure(t, pipeStr)
+	}
+
+	f(`math`)
+	f(`math x`)
+	f(`math x as`)
+	f(`math abs() as x`)
+	f(`math abs(a, b) as x`)
+	f(`math min() as x`)
+	f(`math min(a) as x`)
+	f(`math max() as x`)
+	f(`math max(a) as x`)
+	f(`math round() as x`)
+	f(`math round(a, b, c) as x`)
+}
+
+func TestPipeMath(t *testing.T) {
+	f := func(pipeStr string, rows, rowsExpected [][]Field) {
+		t.Helper()
+		expectPipeResults(t, pipeStr, rows, rowsExpected)
+	}
+
+	f("math b+1 as a, a*2 as b, b-10.5+c as c", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "3"},
+			{"b", "6"},
+			{"c", "-1.5"},
+		},
+	})
+
+	f("math 1 as a", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	})
+
+	f("math 10 * 5 - 3 a", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "47"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	})
+
+	f("math -1.5K as a", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "-1500"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	})
+
+	f("math b as a", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "2"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	})
+
+	f("math a as a", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "NaN"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	})
+
+	f("math 2*c + b as x", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+			{"x", "8"},
+		},
+	})
+
+	f("math abs(-min(a,b)) as min, round(max(40*b/30,c)) as max", [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+	}, [][]Field{
+		{
+			{"a", "v1"},
+			{"b", "2"},
+			{"c", "3"},
+			{"min", "2"},
+			{"max", "3"},
+		},
+	})
+
+	f("math round((2*c + (b%c))/(c-b)^(b-1), -0.001) as a", [][]Field{
+		{
+			{"a", "v"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+		{
+			{"a", "x"},
+			{"b", "3"},
+			{"c", "5"},
+		},
+		{
+			{"b", "3"},
+			{"c", "6"},
+		},
+	}, [][]Field{
+		{
+			{"a", "8"},
+			{"b", "2"},
+			{"c", "3"},
+		},
+		{
+			{"a", "3.25"},
+			{"b", "3"},
+			{"c", "5"},
+		},
+		{
+			{"a", "1.667"},
+			{"b", "3"},
+			{"c", "6"},
+		},
+	})
+}
+
+func TestPipeMathUpdateNeededFields(t *testing.T) {
+	f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
+		t.Helper()
+		expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
+	}
+
+	// all the needed fields
+	f("math (x + 1) as y", "*", "", "*", "y")
+
+	// all the needed fields, unneeded fields do not intersect with src and dst
+	f("math (x + 1) as y", "*", "f1,f2", "*", "f1,f2,y")
+
+	// all the needed fields, unneeded fields intersect with src
+	f("math (x + 1) as y", "*", "f1,x", "*", "f1,y")
+
+	// all the needed fields, unneeded fields intersect with dst
+	f("math (x + 1) as y", "*", "f1,y", "*", "f1,y")
+
+	// all the needed fields, unneeded fields intersect with src and dst
+	f("math (x + 1) as y", "*", "f1,x,y", "*", "f1,x,y")
+
+	// needed fields do not intersect with src and dst
+	f("math (x + 1) as y", "f1,f2", "", "f1,f2", "")
+
+	// needed fields intersect with src
+	f("math (x + 1) as y", "f1,x", "", "f1,x", "")
+
+	// needed fields intersect with dst
+	f("math (x + 1) as y", "f1,y", "", "f1,x", "")
+
+	// needed fields intersect with src and dst
+	f("math (x + 1) as y", "f1,x,y", "", "f1,x", "")
+}
diff --git a/lib/logstorage/pipe_replace.go b/lib/logstorage/pipe_replace.go
index 13d66c5a86..10e36169db 100644
--- a/lib/logstorage/pipe_replace.go
+++ b/lib/logstorage/pipe_replace.go
@@ -3,6 +3,8 @@ package logstorage
 import (
 	"fmt"
 	"strings"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
 )
 
 // pipeReplace processes '| replace ...' pipe.
@@ -59,11 +61,9 @@ func (pr *pipeReplace) initFilterInValues(cache map[string][]string, getFieldVal
 
 func (pr *pipeReplace) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
 	updateFunc := func(a *arena, v string) string {
-		bb := bbPool.Get()
-		bb.B = appendReplace(bb.B[:0], v, pr.oldSubstr, pr.newSubstr, pr.limit)
-		result := a.copyBytesToString(bb.B)
-		bbPool.Put(bb)
-		return result
+		bLen := len(a.b)
+		a.b = appendReplace(a.b, v, pr.oldSubstr, pr.newSubstr, pr.limit)
+		return bytesutil.ToUnsafeString(a.b[bLen:])
 	}
 
 	return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
diff --git a/lib/logstorage/pipe_replace_regexp.go b/lib/logstorage/pipe_replace_regexp.go
index 24aa5418c2..43c951f589 100644
--- a/lib/logstorage/pipe_replace_regexp.go
+++ b/lib/logstorage/pipe_replace_regexp.go
@@ -3,6 +3,8 @@ package logstorage
 import (
 	"fmt"
 	"regexp"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
 )
 
 // pipeReplaceRegexp processes '| replace_regexp ...' pipe.
@@ -59,11 +61,9 @@ func (pr *pipeReplaceRegexp) initFilterInValues(cache map[string][]string, getFi
 
 func (pr *pipeReplaceRegexp) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
 	updateFunc := func(a *arena, v string) string {
-		bb := bbPool.Get()
-		bb.B = appendReplaceRegexp(bb.B[:0], v, pr.re, pr.replacement, pr.limit)
-		result := a.copyBytesToString(bb.B)
-		bbPool.Put(bb)
-		return result
+		bLen := len(a.b)
+		a.b = appendReplaceRegexp(a.b, v, pr.re, pr.replacement, pr.limit)
+		return bytesutil.ToUnsafeString(a.b[bLen:])
 	}
 
 	return newPipeUpdateProcessor(workersCount, updateFunc, ppNext, pr.field, pr.iff)
diff --git a/lib/logstorage/pipe_stats.go b/lib/logstorage/pipe_stats.go
index 90fdb0c73c..ab3852e959 100644
--- a/lib/logstorage/pipe_stats.go
+++ b/lib/logstorage/pipe_stats.go
@@ -537,13 +537,14 @@ func (psp *pipeStatsProcessor) flush() error {
 	return nil
 }
 
-func parsePipeStats(lex *lexer) (*pipeStats, error) {
-	if !lex.isKeyword("stats") {
-		return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
+func parsePipeStats(lex *lexer, needStatsKeyword bool) (*pipeStats, error) {
+	if needStatsKeyword {
+		if !lex.isKeyword("stats") {
+			return nil, fmt.Errorf("expecting 'stats'; got %q", lex.token)
+		}
+		lex.nextToken()
 	}
 
-	lex.nextToken()
-
 	var ps pipeStats
 	if lex.isKeyword("by", "(") {
 		if lex.isKeyword("by") {
diff --git a/lib/logstorage/pipe_stats_test.go b/lib/logstorage/pipe_stats_test.go
index 0d2cdd4c95..5363518db2 100644
--- a/lib/logstorage/pipe_stats_test.go
+++ b/lib/logstorage/pipe_stats_test.go
@@ -39,6 +39,70 @@ func TestPipeStats(t *testing.T) {
 		expectPipeResults(t, pipeStr, rows, rowsExpected)
 	}
 
+	// missing 'stats' keyword
+	f("count(*) as rows", [][]Field{
+		{
+			{"_msg", `abc`},
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"_msg", `def`},
+			{"a", `1`},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+		},
+	}, [][]Field{
+		{
+			{"rows", "3"},
+		},
+	})
+
+	// missing 'stats' keyword
+	f("count() as rows, count() if (a:2) rows2", [][]Field{
+		{
+			{"_msg", `abc`},
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"_msg", `def`},
+			{"a", `1`},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+		},
+	}, [][]Field{
+		{
+			{"rows", "3"},
+			{"rows2", "2"},
+		},
+	})
+
+	f("stats count() as rows, count() if (a:2) rows2", [][]Field{
+		{
+			{"_msg", `abc`},
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"_msg", `def`},
+			{"a", `1`},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+		},
+	}, [][]Field{
+		{
+			{"rows", "3"},
+			{"rows2", "2"},
+		},
+	})
+
 	f("stats count(*) as rows", [][]Field{
 		{
 			{"_msg", `abc`},
@@ -141,6 +205,32 @@ func TestPipeStats(t *testing.T) {
 		},
 	})
 
+	// missing 'stats' keyword
+	f("by (a) count(*) as rows", [][]Field{
+		{
+			{"_msg", `abc`},
+			{"a", `2`},
+			{"b", `3`},
+		},
+		{
+			{"_msg", `def`},
+			{"a", `1`},
+		},
+		{
+			{"a", `2`},
+			{"b", `54`},
+		},
+	}, [][]Field{
+		{
+			{"a", "1"},
+			{"rows", "1"},
+		},
+		{
+			{"a", "2"},
+			{"rows", "2"},
+		},
+	})
+
 	f("stats by (a) count(*) as rows", [][]Field{
 		{
 			{"_msg", `abc`},
diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go
index b7861e03eb..fde04b3696 100644
--- a/lib/logstorage/pipe_unpack.go
+++ b/lib/logstorage/pipe_unpack.go
@@ -80,10 +80,12 @@ func (uctx *fieldsUnpackerContext) addField(name, value string) {
 	nameCopy := ""
 	fieldPrefix := uctx.fieldPrefix
 	if fieldPrefix != "" {
-		nameBuf := uctx.a.newBytes(len(fieldPrefix) + len(name))
-		copy(nameBuf, fieldPrefix)
-		copy(nameBuf[len(fieldPrefix):], name)
-		nameCopy = bytesutil.ToUnsafeString(nameBuf)
+		b := uctx.a.b
+		bLen := len(b)
+		b = append(b, fieldPrefix...)
+		b = append(b, name...)
+		uctx.a.b = b
+		nameCopy = bytesutil.ToUnsafeString(b[bLen:])
 	} else {
 		nameCopy = uctx.a.copyString(name)
 	}
diff --git a/lib/logstorage/pipe_utils_test.go b/lib/logstorage/pipe_utils_test.go
index af6c3b9fd1..418f235bc5 100644
--- a/lib/logstorage/pipe_utils_test.go
+++ b/lib/logstorage/pipe_utils_test.go
@@ -8,6 +8,34 @@ import (
 	"testing"
 )
 
+func expectParsePipeFailure(t *testing.T, pipeStr string) {
+	t.Helper()
+
+	lex := newLexer(pipeStr)
+	p, err := parsePipe(lex)
+	if err == nil && lex.isEnd() {
+		t.Fatalf("expecting error when parsing [%s]; parsed result: [%s]", pipeStr, p)
+	}
+}
+
+func expectParsePipeSuccess(t *testing.T, pipeStr string) {
+	t.Helper()
+
+	lex := newLexer(pipeStr)
+	p, err := parsePipe(lex)
+	if err != nil {
+		t.Fatalf("cannot parse [%s]: %s", pipeStr, err)
+	}
+	if !lex.isEnd() {
+		t.Fatalf("unexpected tail after parsing [%s]: [%s]", pipeStr, lex.s)
+	}
+
+	pipeStrResult := p.String()
+	if pipeStrResult != pipeStr {
+		t.Fatalf("unexpected string representation of pipe; got\n%s\nwant\n%s", pipeStrResult, pipeStr)
+	}
+}
+
 func expectPipeResults(t *testing.T, pipeStr string, rows, rowsExpected [][]Field) {
 	t.Helper()
 
diff --git a/lib/logstorage/storage_search.go b/lib/logstorage/storage_search.go
index f7e43b89b3..bf681fc0e4 100644
--- a/lib/logstorage/storage_search.go
+++ b/lib/logstorage/storage_search.go
@@ -229,12 +229,12 @@ func (s *Storage) getFieldValuesNoHits(ctx context.Context, tenantIDs []TenantID
 func (s *Storage) GetFieldValues(ctx context.Context, tenantIDs []TenantID, q *Query, fieldName string, limit uint64) ([]ValueWithHits, error) {
 	pipes := append([]pipe{}, q.pipes...)
 	quotedFieldName := quoteTokenIfNeeded(fieldName)
-	pipeStr := fmt.Sprintf("uniq by (%s) with hits limit %d", quotedFieldName, limit)
+	pipeStr := fmt.Sprintf("field_values %s limit %d", quotedFieldName, limit)
 	lex := newLexer(pipeStr)
 
-	pu, err := parsePipeUniq(lex)
+	pu, err := parsePipeFieldValues(lex)
 	if err != nil {
-		logger.Panicf("BUG: unexpected error when parsing 'uniq' pipe at [%s]: %s", pipeStr, err)
+		logger.Panicf("BUG: unexpected error when parsing 'field_values' pipe at [%s]: %s", pipeStr, err)
 	}
 
 	if !lex.isEnd() {
diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go
index f664a6f634..488c46c481 100644
--- a/lib/logstorage/storage_search_test.go
+++ b/lib/logstorage/storage_search_test.go
@@ -3,6 +3,7 @@ package logstorage
 import (
 	"context"
 	"fmt"
+	"reflect"
 	"strings"
 	"sync"
 	"sync/atomic"
@@ -311,6 +312,157 @@ func TestStorageRunQuery(t *testing.T) {
 		tenantIDs := []TenantID{tenantID}
 		mustRunQuery(tenantIDs, q, writeBlock)
 	})
+	t.Run("field_names-all", func(t *testing.T) {
+		q := mustParseQuery("*")
+		names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{"_msg", 1155},
+			{"_stream", 1155},
+			{"_time", 1155},
+			{"instance", 1155},
+			{"job", 1155},
+			{"source-file", 1155},
+			{"stream-id", 1155},
+			{"tenant.id", 1155},
+		}
+		if !reflect.DeepEqual(names, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
+		}
+	})
+	t.Run("field_names-some", func(t *testing.T) {
+		q := mustParseQuery(`_stream:{instance=~"host-1:.+"}`)
+		names, err := s.GetFieldNames(context.Background(), allTenantIDs, q)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{"_msg", 385},
+			{"_stream", 385},
+			{"_time", 385},
+			{"instance", 385},
+			{"job", 385},
+			{"source-file", 385},
+			{"stream-id", 385},
+			{"tenant.id", 385},
+		}
+		if !reflect.DeepEqual(names, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
+		}
+	})
+	t.Run("field_values-nolimit", func(t *testing.T) {
+		q := mustParseQuery("*")
+		values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 0)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`{instance="host-0:234",job="foobar"}`, 385},
+			{`{instance="host-1:234",job="foobar"}`, 385},
+			{`{instance="host-2:234",job="foobar"}`, 385},
+		}
+		if !reflect.DeepEqual(values, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
+		}
+	})
+	t.Run("field_values-limit", func(t *testing.T) {
+		q := mustParseQuery("*")
+		values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 3)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`{instance="host-0:234",job="foobar"}`, 0},
+			{`{instance="host-1:234",job="foobar"}`, 0},
+			{`{instance="host-2:234",job="foobar"}`, 0},
+		}
+		if !reflect.DeepEqual(values, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
+		}
+	})
+	t.Run("field_values-limit", func(t *testing.T) {
+		q := mustParseQuery("instance:='host-1:234'")
+		values, err := s.GetFieldValues(context.Background(), allTenantIDs, q, "_stream", 4)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`{instance="host-1:234",job="foobar"}`, 385},
+		}
+		if !reflect.DeepEqual(values, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
+		}
+	})
+	t.Run("stream_field_names", func(t *testing.T) {
+		q := mustParseQuery("*")
+		names, err := s.GetStreamFieldNames(context.Background(), allTenantIDs, q)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{"instance", 1155},
+			{"job", 1155},
+		}
+		if !reflect.DeepEqual(names, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
+		}
+	})
+	t.Run("stream_field_values-nolimit", func(t *testing.T) {
+		q := mustParseQuery("*")
+		values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 0)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`host-0:234`, 385},
+			{`host-1:234`, 385},
+			{`host-2:234`, 385},
+		}
+		if !reflect.DeepEqual(values, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
+		}
+	})
+	t.Run("stream_field_values-limit", func(t *testing.T) {
+		q := mustParseQuery("*")
+		values, err := s.GetStreamFieldValues(context.Background(), allTenantIDs, q, "instance", 3)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`host-0:234`, 385},
+			{`host-1:234`, 385},
+			{`host-2:234`, 385},
+		}
+		if !reflect.DeepEqual(values, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", values, resultExpected)
+		}
+	})
+	t.Run("streams", func(t *testing.T) {
+		q := mustParseQuery("*")
+		names, err := s.GetStreams(context.Background(), allTenantIDs, q, 0)
+		if err != nil {
+			t.Fatalf("unexpected error: %s", err)
+		}
+
+		resultExpected := []ValueWithHits{
+			{`{instance="host-0:234",job="foobar"}`, 385},
+			{`{instance="host-1:234",job="foobar"}`, 385},
+			{`{instance="host-2:234",job="foobar"}`, 385},
+		}
+		if !reflect.DeepEqual(names, resultExpected) {
+			t.Fatalf("unexpected result; got\n%v\nwant\n%v", names, resultExpected)
+		}
+	})
 
 	// Run more complex tests
 	f := func(t *testing.T, query string, rowsExpected [][]Field) {