diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index fc3f65bba..f74c63735 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -298,11 +298,36 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req httpserver.Errorf(w, r, "%s", err) return } - if limit > 0 { - q.AddPipeLimit(uint64(limit)) - } bw := getBufferedWriter(w) + defer func() { + bw.FlushIgnoreErrors() + putBufferedWriter(bw) + }() + w.Header().Set("Content-Type", "application/stream+json") + + if limit > 0 { + if q.CanReturnLastNResults() { + rows, err := getLastNQueryResults(ctx, tenantIDs, q, limit) + if err != nil { + httpserver.Errorf(w, r, "%s", err) + return + } + bb := blockResultPool.Get() + b := bb.B + for i := range rows { + b = logstorage.MarshalFieldsToJSON(b[:0], rows[i].fields) + b = append(b, '\n') + bw.WriteIgnoreErrors(b) + } + bb.B = b + blockResultPool.Put(bb) + return + } + + q.AddPipeLimit(uint64(limit)) + q.Optimize() + } writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 || len(columns[0].Values) == 0 { @@ -317,20 +342,103 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req blockResultPool.Put(bb) } - w.Header().Set("Content-Type", "application/stream+json") - q.Optimize() - err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) - - bw.FlushIgnoreErrors() - putBufferedWriter(bw) - - if err != nil { + if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil { httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err) } } var blockResultPool bytesutil.ByteBufferPool +type row struct { + timestamp int64 + fields []logstorage.Field +} + +func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { + q.AddPipeLimit(uint64(limit + 1)) + q.Optimize() + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) + if err != nil { + return nil, err + } + if len(rows) <= limit { + // Fast path - the requested time range contains up to limit rows. + sortRowsByTime(rows) + return rows, nil + } + + // Slow path - search for the time range with the requested limit rows. + start, end := q.GetFilterTimeRange() + d := end/2 - start/2 + start += d + + qOrig := q + for { + q = qOrig.Clone() + q.AddTimeFilter(start, end) + rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1) + if err != nil { + return nil, err + } + + if len(rows) == limit || len(rows) > limit && d < 10e6 || d == 0 { + sortRowsByTime(rows) + if len(rows) > limit { + rows = rows[len(rows)-limit:] + } + return rows, nil + } + + lastBit := d & 1 + d /= 2 + if len(rows) > limit { + start += d + } else { + start -= d + lastBit + } + } +} + +func sortRowsByTime(rows []row) { + sort.Slice(rows, func(i, j int) bool { + return rows[i].timestamp < rows[j].timestamp + }) +} + +func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) { + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + var rows []row + var rowsLock sync.Mutex + writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { + rowsLock.Lock() + defer rowsLock.Unlock() + + for i, timestamp := range timestamps { + fields := make([]logstorage.Field, len(columns)) + for j := range columns { + f := &fields[j] + f.Name = strings.Clone(columns[j].Name) + f.Value = strings.Clone(columns[j].Values[i]) + } + rows = append(rows, row{ + timestamp: timestamp, + fields: fields, + }) + } + + if len(rows) >= limit { + cancel() + } + } + if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock); err != nil { + return nil, err + } + + return rows, nil +} + func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, error) { // Extract tenantID tenantID, err := logstorage.GetTenantIDFromRequest(r) @@ -373,10 +481,10 @@ func getTimeNsec(r *http.Request, argName string) (int64, bool, error) { if s == "" { return 0, false, nil } - currentTimestamp := float64(time.Now().UnixNano()) / 1e9 - secs, err := promutils.ParseTimeAt(s, currentTimestamp) + currentTimestamp := time.Now().UnixNano() + nsecs, err := promutils.ParseTimeAt(s, currentTimestamp) if err != nil { return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err) } - return int64(secs * 1e9), true, nil + return nsecs, true, nil } diff --git a/deployment/docker/docker-compose-victorialogs.yml b/deployment/docker/docker-compose-victorialogs.yml index 2cf4fa782..9dbef797b 100644 --- a/deployment/docker/docker-compose-victorialogs.yml +++ b/deployment/docker/docker-compose-victorialogs.yml @@ -43,7 +43,7 @@ services: # storing logs and serving read queries. victorialogs: container_name: victorialogs - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs command: - "--storageDataPath=/vlogs" - "--httpListenAddr=:9428" diff --git a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml index f60a53e4e..979ec9dd7 100644 --- a/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: - -beat.uri=http://filebeat-victorialogs:5066 victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-filebeat-docker-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml index 19724774f..cbae6b505 100644 --- a/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml +++ b/deployment/docker/victorialogs/filebeat-syslog/docker-compose.yml @@ -13,7 +13,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-filebeat-syslog-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml index 9d92bc35b..23c967339 100644 --- a/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/fluentbit-docker/docker-compose.yml @@ -11,7 +11,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-fluentbit-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/logstash/docker-compose.yml b/deployment/docker/victorialogs/logstash/docker-compose.yml index 8abea6eae..db46a319d 100644 --- a/deployment/docker/victorialogs/logstash/docker-compose.yml +++ b/deployment/docker/victorialogs/logstash/docker-compose.yml @@ -14,7 +14,7 @@ services: - "5140:5140" victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-logstash-vl:/vlogs ports: diff --git a/deployment/docker/victorialogs/promtail/docker-compose.yml b/deployment/docker/victorialogs/promtail/docker-compose.yml index 1848e2447..8a628a83c 100644 --- a/deployment/docker/victorialogs/promtail/docker-compose.yml +++ b/deployment/docker/victorialogs/promtail/docker-compose.yml @@ -12,7 +12,7 @@ services: - "5140:5140" vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-promtail-docker:/vlogs ports: diff --git a/deployment/docker/victorialogs/vector-docker/docker-compose.yml b/deployment/docker/victorialogs/vector-docker/docker-compose.yml index 89259d829..50ac8e646 100644 --- a/deployment/docker/victorialogs/vector-docker/docker-compose.yml +++ b/deployment/docker/victorialogs/vector-docker/docker-compose.yml @@ -22,7 +22,7 @@ services: condition: service_healthy victorialogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - victorialogs-vector-docker-vl:/vlogs ports: diff --git a/deployment/logs-benchmark/docker-compose.yml b/deployment/logs-benchmark/docker-compose.yml index f16bbaf1d..9fd53f607 100644 --- a/deployment/logs-benchmark/docker-compose.yml +++ b/deployment/logs-benchmark/docker-compose.yml @@ -3,7 +3,7 @@ version: '3' services: # Run `make package-victoria-logs` to build victoria-logs image vlogs: - image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs volumes: - vlogs:/vlogs ports: diff --git a/docs/VictoriaLogs/CHANGELOG.md b/docs/VictoriaLogs/CHANGELOG.md index f53abd075..51ef8d8cc 100644 --- a/docs/VictoriaLogs/CHANGELOG.md +++ b/docs/VictoriaLogs/CHANGELOG.md @@ -19,6 +19,15 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta ## tip +## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs) + +Released at 2024-06-04 + +* FEATURE: add [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe) for unpacking [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +* FEATURE: parse timestamps in [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) with nanosecond precision. +* FEATURE: return the last `N` matching logs from [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) with the maximum timestamps if `limit=N` query arg is passed to it. Previously a random subset of matching logs could be returned, which could complicate investigation of the returned logs. +* FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. + ## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs) Released at 2024-05-30 diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 3a77732ab..82618d013 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -13,7 +13,10 @@ aliases: # LogsQL LogsQL is a simple yet powerful query language for [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/). -It provides the following features: +See [examples](https://docs.victoriametrics.com/victorialogs/logsql-examples/) and [tutorial](#logsql-tutorial) +in order to feel the language. + +LogsQL provides the following features: - Full-text search across [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). See [word filter](#word-filter), [phrase filter](#phrase-filter) and [prefix filter](#prefix-filter). @@ -194,7 +197,8 @@ _time:5m error | stats count() logs_with_error Finally, it is recommended reading [performance tips](#performance-tips). -Now you are familiar with LogsQL basics. Read [query syntax](#query-syntax) if you want to continue learning LogsQL. +Now you are familiar with LogsQL basics. See [LogsQL examples](https://docs.victoriametrics.com/victorialogs/logsql-examples/) and [query syntax](#query-syntax) +if you want to continue learning LogsQL. ### Key concepts @@ -287,7 +291,7 @@ _time:1h AND error The following formats are supported for `_time` filter: -- `_time:duration` matches logs with timestamps on the time range `(now-duration, now]`. Examples: +- `_time:duration` matches logs with timestamps on the time range `(now-duration, now]`, where `duration` can have [these values](#duration-values). Examples: - `_time:5m` - returns logs for the last 5 minutes - `_time:2.5d15m42.345s` - returns logs for the last 2.5 days, 15 minutes and 42.345 seconds - `_time:1y` - returns logs for the last year @@ -1151,6 +1155,7 @@ LogsQL supports the following pipes: - [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. - [`extract`](#extract-pipe) extracts the specified text into the given log fields. - [`extract_regexp`](#extract_regexp-pipe) extracts the specified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax). - [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -1168,8 +1173,9 @@ LogsQL supports the following pipes: - [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`stats`](#stats-pipe) calculates various stats over the selected logs. - [`uniq`](#uniq-pipe) returns unique log entires. -- [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). -- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). +- [`unpack_syslog`](#unpack_syslog-pipe) [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). - [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). ### copy pipe @@ -1215,6 +1221,22 @@ See also: - [`rename` pipe](#rename-pipe) - [`fields` pipe](#fields-pipe) +### drop_empty_fields pipe + +`| drop_empty_fields` pipe drops [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. It also skips log entries with zero non-empty fields. + +For example, the following query drops possible empty `email` field generated by [`extract` pipe](#extract-pipe) if the `foo` field doesn't contain email: + +```logsql +_time:5m | extract 'email: ,' from foo | drop_empty_fields +``` + +See also: + +- [`filter` pipe](#filter-pipe) +- [`extract` pipe](#extract-pipe) + + ### extract pipe `| extract "pattern" from field_name` [pipe](#pipes) allows extracting arbitrary text into output fields according to the [`pattern`](#format-for-extract-pipe-pattern) from the given @@ -1291,6 +1313,7 @@ the corresponding matching substring to. Matching starts from the first occurrence of the `text1` in the input text. If the `pattern` starts with `` and doesn't contain `text1`, then the matching starts from the beginning of the input text. Matching is performed sequentially according to the `pattern`. If some `textX` isn't found in the remaining input text, then the remaining named placeholders receive empty string values and the matching finishes prematurely. +The empty string values can be dropped with [`drop_empty_fields` pipe](#drop_empty_fields-pipe). Matching finishes successfully when `textN+1` is found in the input text. If the `pattern` ends with `` and doesn't contain `textN+1`, then the `` matches the remaining input text. @@ -2105,7 +2128,7 @@ See also: ### unpack_json pipe -`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) +`| unpack_json from field_name` [pipe](#pipes) unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). @@ -2172,6 +2195,7 @@ See also: - [Conditional `unpack_json`](#conditional-unpack_json) - [`unpack_logfmt` pipe](#unpack_logfmt-pipe) +- [`unpack_syslog` pipe](#unpack_syslog-pipe) - [`extract` pipe](#extract-pipe) - [`unroll` pipe](#unroll-pipe) - [`pack_json` pipe](#pack_json-pipe) @@ -2188,7 +2212,7 @@ _time:5m | unpack_json if (ip:"") from foo ### unpack_logfmt pipe -`| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields +`| unpack_logfmt from field_name` [pipe](#pipes) unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched. @@ -2235,7 +2259,7 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes _time:5m | extract ' ip=' ``` -If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON, +If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from logfmt, by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field: @@ -2256,6 +2280,7 @@ See also: - [Conditional unpack_logfmt](#conditional-unpack_logfmt) - [`unpack_json` pipe](#unpack_json-pipe) +- [`unpack_syslog` pipe](#unpack_syslog-pipe) - [`extract` pipe](#extract-pipe) #### Conditional unpack_logfmt @@ -2269,6 +2294,86 @@ only if `ip` field in the current log entry isn't set or empty: _time:5m | unpack_logfmt if (ip:"") from foo ``` +### unpack_syslog pipe + +`| unpack_syslog from field_name` [pipe](#pipes) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message +from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats: + +- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE` +- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE` + +The following fields are unpacked: + +- `priority` - it is obtained from `PRI`. +- `facility` - it is calculated as `PRI / 8`. +- `severity` - it is calculated as `PRI % 8`. +- `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) + is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months. +- `hostname` +- `app_name` +- `proc_id` +- `msg_id` +- `message` + +The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value +according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe). + +For example, the following query unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +across logs for the last 5 minutes: + +```logsql +_time:5m | unpack_syslog from _msg +``` + +The `from _json` part can be omitted when [syslog](https://en.wikipedia.org/wiki/Syslog) message is unpacked +from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +The following query is equivalent to the previous one: + +```logsql +_time:5m | unpack_syslog +``` + +If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`: + +```logsql +_time:5m | unpack_syslog keep_original_fields +``` + +If you want to make sure that the unpacked [syslog](https://en.wikipedia.org/wiki/Syslog) fields do not clash with the existing fields, +then specify common prefix for all the fields extracted from syslog, by adding `result_prefix "prefix_name"` to `unpack_syslog`. +For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field: + +```logsql +_time:5m | unpack_syslog from foo result_prefix "foo_" +``` + +Performance tips: + +- It is better from performance and resource usage PoV ingesting parsed [syslog](https://en.wikipedia.org/wiki/Syslog) messages into VictoriaLogs + according to the [supported data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) + instead of ingesting unparsed syslog lines into VictoriaLogs and then parsing them at query time with [`unpack_syslog` pipe](#unpack_syslog-pipe). + +- It is recommended using more specific [log filters](#filters) in order to reduce the number of log entries, which are passed to `unpack_syslog`. + See [general performance tips](#performance-tips) for details. + +See also: + +- [Conditional unpack_syslog](#conditional-unpack_syslog) +- [`unpack_json` pipe](#unpack_json-pipe) +- [`unpack_logfmt` pipe](#unpack_logfmt-pipe) +- [`extract` pipe](#extract-pipe) + +#### Conditional unpack_syslog + +If the [`unpack_syslog` pipe](#unpack_syslog-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then add `if ()` after `unpack_syslog`. +The `` can contain arbitrary [filters](#filters). For example, the following query unpacks syslog message fields from `foo` field +only if `hostname` field in the current log entry isn't set or empty: + +```logsql +_time:5m | unpack_syslog if (hostname:"") from foo +``` + ### unroll pipe `| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN` diff --git a/docs/VictoriaLogs/QuickStart.md b/docs/VictoriaLogs/QuickStart.md index 809ce2690..85ad0a8b8 100644 --- a/docs/VictoriaLogs/QuickStart.md +++ b/docs/VictoriaLogs/QuickStart.md @@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: ```sh -curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.15.0-victorialogs/victoria-logs-linux-amd64-v0.15.0-victorialogs.tar.gz -tar xzf victoria-logs-linux-amd64-v0.15.0-victorialogs.tar.gz +curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.16.0-victorialogs/victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz +tar xzf victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz ./victoria-logs-prod ``` @@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container: ```sh docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ - docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs + docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs ``` See also: diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index 6eae9414c..4737c30a6 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -10,7 +10,7 @@ aliases: VictoriaLogs is [open source](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/victoria-logs) user-friendly database for logs from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/). -VictoriaLogs provides the following key features: +VictoriaLogs provides the following features: - VictoriaLogs can accept logs from popular log collectors. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/). - VictoriaLogs is much easier to set up and operate compared to Elasticsearch and Grafana Loki. diff --git a/docs/VictoriaLogs/logsql-examples.md b/docs/VictoriaLogs/logsql-examples.md new file mode 100644 index 000000000..740dc5268 --- /dev/null +++ b/docs/VictoriaLogs/logsql-examples.md @@ -0,0 +1,399 @@ +--- +sort: 100 +weight: 100 +title: LogsQL examples +menu: + docs: + parent: "victorialogs" + weight: 100 +--- + +# LogsQL examples + +## How to select recently ingested logs? + +[Run](https://docs.victoriametrics.com/victorialogs/querying/) the following query: + +```logsql +_time:5m +``` + +It returns logs over the last 5 minutes by using [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter). +The logs are returned in arbitrary order because of performance reasons. +Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) to the query if you need sorting +the returned logs by some field (usually [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field)): + +```logsql +_time:5m | sort by (_time) + +If the number of returned logs is too big, it may be limited with the [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe). +For example, the following query returns 10 most recent logs, which were ingested during the last 5 minutes: + +```logsql +_time:5m | sort by (_time desc) | limit 10 +``` + +See also: + +- [How to count the number of matching logs](#how-to-count-the-number-of-matching-logs) + +## How to select logs with the given word in log message? + +Just put the needed [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the query. +For example, the following query returns all the logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +in [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): + +```logsql +error +``` + +If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) +for limiting the time range for the selected logs. For example, the following query returns logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) +over the last hour: + +```logsql +error _time:1h +``` + +If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) +to the query. For example, the following query selects logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), +which do not contain `kubernetes` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour: + +```logsql +error !kubernetes _time:1h +``` + +The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) +for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query +sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +error _time:1h | sort by (_time) +``` + +See also: + +- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message) +- [How to select logs with some of the given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message) +- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message) +- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) +- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter) +- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) +- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter) + + +## How to skip logs with the given word in log message? + +Use [`NOT` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, the following query returns all the logs +without the `INFO` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): + +```logsq +!INFO +``` + +If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) +for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour: + +```logsql +!INFO _time:1h +``` + +If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) +to the query. For example, the following query selects logs without `INFO` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), +which contain `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour: + +```logsql +!INFO error _time:1h +``` + +The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) +for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query +sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +!INFO _time:1h | sort by (_time) +``` + +See also: + +- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message) +- [How to select logs with some of given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message) +- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) +- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter) +- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) +- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter) + + +## How to select logs with all the given words in log message? + +Just enumerate the needed [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the query, by deliming them with whitespace. +For example, the following query selects logs containing both `error` and `kubernetes` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) +in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): + +```logsql +error kubernetes +``` + +This query uses [`AND` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). + +If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) +for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour: + +```logsql +error kubernetes _time:1h +``` + +If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) +to the query. For example, the following query selects logs with `error` and `kubernetes` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) +from [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `container="my-app"` field, over the last hour: + +```logsql +error kubernetes _stream:{container="my-app"} _time:1h +``` + +The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) +for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query +sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +error kubernetes _time:1h | sort by (_time) +``` + +See also: + +- [How to select logs with some of given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message) +- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message) +- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) +- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter) +- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) +- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter) + + +## How to select logs with some of the given words in log message? + +Put the needed [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) into `(...)`, by delimiting them with ` or `. +For example, the following query selects logs with `error`, `ERROR` or `Error` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) +in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): + +```logsql +(error or ERROR or Error) +``` + +This query uses [`OR` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). + +If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) +for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour: + +```logsql +(error or ERROR or Error) _time:1h +``` + +If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) +to the query. For example, the following query selects logs without `error`, `ERROR` or `Error` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word), +which do not contain `kubernetes` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour: + +```logsql +(error or ERROR or Error) !kubernetes _time:1h +``` + +The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) +for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query +sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +(error or ERROR or Error) _time:1h | sort by (_time) +``` + +See also: + +- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message) +- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message) +- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter) +- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter) +- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter) +- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter) + + +## How to select logs from the given application instance? + +Make sure the application is properly configured with [stream-level log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). +Then just use [`_stream` filter](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter) for selecting logs for the given application instance. +For example, if the application contains `job="app-42"` and `instance="host-123:5678"` [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), +then the following query selects all the logs from this application: + +```logsql +_stream:{job="app-42",instance="host-123:5678"} +``` + +If the number of returned logs is too big, it is recommended adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) +to the query in order to reduce the number of matching logs. For example, the following query returns logs for the given application for the last day: + +```logsql +_stream:{job="app-42",instance="host-123:5678"} _time:1d +``` + +If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters) +to the query. For example, the following query selects logs from the given [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields), +which contain `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field), +over the last day: + +```logsql +_stream:{job="app-42",instance="host-123:5678"} error _time:1d +``` + +The logs are returned in arbitrary order because of performance reasons. Use [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) +for sorting the returned logs by the needed fields. For example, the following query sorts the selected logs +by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field): + +```logsql +_stream:{job="app-42",instance="host-123:5678"} _time:1d | sort by (_time) +``` + +See also: + +- [How to determine applications with the most logs?](#how-to-determine-applications-with-the-most-logs) +- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message) + + +## How to count the number of matching logs? + +Use [`count()` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count-stats). For example, the following query returns +the number of results returned by `your_query_here`: + +```logsql +your_query_here | count() +``` + +## How to determine applications with the most logs? + +[Run](https://docs.victoriametrics.com/victorialogs/querying/) the following query: + +```logsql +_time:5m | stats by (_stream) count() as logs | sort by (logs desc) | limit 10 +``` + +This query returns top 10 application instances (aka [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)) +with the most logs over the last 5 minutes. + +This query uses the following [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) features: + +- [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) for selecting logs on the given time range (5 minutes in the query above). +- [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for calculating the number of logs. + per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). [`count` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count-stats) + is used for calculating the needed stats. +- [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order. +- [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10. + +See also: + +- [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation) +- [How to calculate the number of logs per the given interval?](#how-to-calculate-the-number-of-logs-per-the-given-interval) +- [How to select logs from the given application instance?](#how-to-select-logs-from-the-given-application-instance) + + +## How to parse JSON inside log message? + +It is better from performance and resource usage PoV to avoid storing JSON inside [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field). +It is recommended storing individual JSON fields and log fields instead according to [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). + +If you have to store JSON inside log message or inside any other [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model), +then the stored JSON can be parsed during query time via [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe). +For example, the following query unpacks JSON from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +across all the logs for the last 5 minutes: + +```logsql +_time:5m | unpack_json +``` + +If you need to parse JSON array, then take a look at [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe). + + +## How to extract some data from text log message? + +Use [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) or [`extract_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) pipe. +For example, the following query extracts `username` and `user_id` fields from text [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): + +```logsql +_time:5m | extract "username=, user_id=," +``` + +See also: + +- [Replacing substrings in text fields](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe) + + +## How to filter out data after stats calculation? + +Use [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe). For example, the following query +returns only [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with more than 1000 logs +over the last 5 minutes: + +```logsql +_time:5m | stats by (_stream) count() rows | filter rows:>1000 +``` + +## How to calculate the number of logs per the given interval? + +Use [`stats` by time bucket](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets). For example, the following query +returns per-hour number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last day: + +```logsq +_time:1d error | stats by (_time:1h) count() rows | sort by (_time) +``` + +This query uses [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) in order to sort per-hour stats +by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). + + +## How to calculate the number of logs per every value of the given field? + +Use [`stats` by field](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields). For example, the following query +calculates the number of logs per `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for logs over the last 5 minutes: + +```logsql +_time:5m | stats by (level) count() rows +``` + +An alternative is to use [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe): + +```logsql +_time:5m | field_values level +``` + +## How to get unique values for the given field? + +Use [`uniq` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe). For example, the following query returns unique values for the `ip` field +over logs for the last 5 minutes: + +```logsql +_time:5m | uniq by (ip) +``` + +## How to get unique sets of values for the given fields? + +Use [`uniq` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe). For example, the following query returns unique sets for (`host`, `path`) fields +over logs for the last 5 minutes: + +```logsql +_time:5m | uniq by (host, path) +``` + +## How to return last N logs for the given query? + +Use [`sort` pipe with limit](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). For example, the following query returns the last 10 logs with the `error` +[word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) +over the logs for the last 5 minutes: + +```logsql +_time:5m error | sort by (_time desc) limit 10 +``` + +It sorts the matching logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in descending order and then selects +the first 10 logs with the highest values for the `_time` field. + +If the query is sent to [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs), then `limit=N` query arg +can be passed to it in order to return up to `N` latest log entries. For example, the following command returns up to 10 latest log entries with the `error` word: + +```sh +curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' +``` diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 3df0c61d0..25df7af8f 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -58,12 +58,14 @@ By default the `/select/logsql/query` returns all the log entries matching the g - By closing the response stream at any time. VictoriaLogs stops query execution and frees all the resources occupied by the request as soon as it detects closed client connection. So it is safe running [`*` query](https://docs.victoriametrics.com/victorialogs/logsql/#any-value-filter), which selects all the logs, even if trillions of logs are stored in VictoriaLogs. -- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following request returns - up to 10 matching log entries: +- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following command returns + up to 10 most recently added log entries with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) + in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): ```sh curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10' ``` -- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example: +- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example, the following command returns up to 10 **random** log entries + with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field): ```sh curl http://localhost:9428/select/logsql/query -d 'query=error | limit 10' ``` @@ -87,8 +89,11 @@ This allows post-processing the returned lines at the client side with the usual without worrying about resource usage at VictoriaLogs side. See [these docs](#command-line) for more details. The returned lines aren't sorted by default, since sorting disables the ability to send matching log entries to response stream as soon as they are found. -Query results can be sorted either at VictoriaLogs side via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) -or at client side with the usual `sort` command according to [these docs](#command-line). +Query results can be sorted in the following ways: + +- By passing `limit=N` query arg to `/select/logsql/query`. The up to `N` most recent matching log entries are returned in the response. +- By adding [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) to the query. +- By using Unix `sort` command at client side according to [these docs](#command-line). By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried. If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query searches diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index 1f1d312d4..a4858f3ba 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -1226,7 +1226,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string { buf := br.a.b bufLen := len(buf) - buf = marshalDuration(buf, nsecs) + buf = marshalDurationString(buf, nsecs) br.a.b = buf return bytesutil.ToUnsafeString(buf[bufLen:]) } diff --git a/lib/logstorage/filter_and.go b/lib/logstorage/filter_and.go index 59d623b2e..513d3e23b 100644 --- a/lib/logstorage/filter_and.go +++ b/lib/logstorage/filter_and.go @@ -114,16 +114,14 @@ func (fa *filterAnd) getByFieldTokens() []fieldTokens { func (fa *filterAnd) initByFieldTokens() { m := make(map[string]map[string]struct{}) - byFieldFilters := make(map[string]int) var fieldNames []string mergeFieldTokens := func(fieldName string, tokens []string) { - fieldName = getCanonicalColumnName(fieldName) - byFieldFilters[fieldName]++ if len(tokens) == 0 { return } + fieldName = getCanonicalColumnName(fieldName) mTokens, ok := m[fieldName] if !ok { fieldNames = append(fieldNames, fieldName) @@ -165,11 +163,6 @@ func (fa *filterAnd) initByFieldTokens() { var byFieldTokens []fieldTokens for _, fieldName := range fieldNames { - if byFieldFilters[fieldName] < 2 { - // It is faster to perform bloom filter match inline when visiting the corresponding column - continue - } - mTokens := m[fieldName] tokens := make([]string, 0, len(mTokens)) for token := range mTokens { diff --git a/lib/logstorage/filter_noop.go b/lib/logstorage/filter_noop.go index 942c10b41..2d3e38035 100644 --- a/lib/logstorage/filter_noop.go +++ b/lib/logstorage/filter_noop.go @@ -5,7 +5,7 @@ type filterNoop struct { } func (fn *filterNoop) String() string { - return "" + return "*" } func (fn *filterNoop) updateNeededFields(_ fieldsSet) { diff --git a/lib/logstorage/filter_or.go b/lib/logstorage/filter_or.go index 040c74680..fa5aab021 100644 --- a/lib/logstorage/filter_or.go +++ b/lib/logstorage/filter_or.go @@ -127,16 +127,14 @@ func (fo *filterOr) getByFieldTokens() []fieldTokens { func (fo *filterOr) initByFieldTokens() { m := make(map[string][][]string) - byFieldFilters := make(map[string]int) var fieldNames []string mergeFieldTokens := func(fieldName string, tokens []string) { - fieldName = getCanonicalColumnName(fieldName) - byFieldFilters[fieldName]++ if len(tokens) == 0 { return } + fieldName = getCanonicalColumnName(fieldName) if _, ok := m[fieldName]; !ok { fieldNames = append(fieldNames, fieldName) } @@ -173,11 +171,6 @@ func (fo *filterOr) initByFieldTokens() { var byFieldTokens []fieldTokens for _, fieldName := range fieldNames { - if byFieldFilters[fieldName] < 2 { - // It is faster to perform bloom filter match inline when visiting the corresponding column - continue - } - commonTokens := getCommonTokens(m[fieldName]) if len(commonTokens) > 0 { byFieldTokens = append(byFieldTokens, fieldTokens{ diff --git a/lib/logstorage/logfmt_parser.go b/lib/logstorage/logfmt_parser.go index 31901cd3c..dc9ef1b85 100644 --- a/lib/logstorage/logfmt_parser.go +++ b/lib/logstorage/logfmt_parser.go @@ -15,6 +15,10 @@ func (p *logfmtParser) reset() { } func (p *logfmtParser) addField(name, value string) { + name = strings.TrimSpace(name) + if name == "" && value == "" { + return + } p.fields = append(p.fields, Field{ Name: name, Value: value, @@ -22,16 +26,24 @@ func (p *logfmtParser) addField(name, value string) { } func (p *logfmtParser) parse(s string) { + p.reset() for { // Search for field name - n := strings.IndexByte(s, '=') + n := strings.IndexAny(s, "= ") if n < 0 { - // field name couldn't be read + // empty value + p.addField(s, "") return } - name := strings.TrimSpace(s[:n]) + name := s[:n] + ch := s[n] s = s[n+1:] + if ch == ' ' { + // empty value + p.addField(name, "") + continue + } if len(s) == 0 { p.addField(name, "") return diff --git a/lib/logstorage/logfmt_parser_test.go b/lib/logstorage/logfmt_parser_test.go index 60161271e..66a1e6552 100644 --- a/lib/logstorage/logfmt_parser_test.go +++ b/lib/logstorage/logfmt_parser_test.go @@ -12,7 +12,7 @@ func TestLogfmtParser(t *testing.T) { defer putLogfmtParser(p) p.parse(s) - result := marshalFieldsToJSON(nil, p.fields) + result := MarshalFieldsToJSON(nil, p.fields) if string(result) != resultExpected { t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) } @@ -22,9 +22,9 @@ func TestLogfmtParser(t *testing.T) { f(`foo=bar`, `{"foo":"bar"}`) f(`foo="bar=baz x=y"`, `{"foo":"bar=baz x=y"}`) f(`foo=`, `{"foo":""}`) + f(`foo`, `{"foo":""}`) + f(`foo bar`, `{"foo":"","bar":""}`) + f(`foo bar=baz`, `{"foo":"","bar":"baz"}`) f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`) - - // errors - f(`foo`, `{}`) - f(`foo=bar baz=x z qwe`, `{"foo":"bar","baz":"x"}`) + f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","":"z","qwe":""}`) } diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 87e4ecc00..2f58a9111 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -243,8 +243,8 @@ func (q *Query) String() string { func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { { // add 'stats by (_time:step offset off, fields) count() hits' - stepStr := string(marshalDuration(nil, step)) - offsetStr := string(marshalDuration(nil, off)) + stepStr := string(marshalDurationString(nil, step)) + offsetStr := string(marshalDurationString(nil, off)) byFieldsStr := "_time:" + stepStr + " offset " + offsetStr for _, f := range fields { byFieldsStr += ", " + quoteTokenIfNeeded(f) @@ -279,6 +279,38 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) { } } +// Clone returns a copy of q. +func (q *Query) Clone() *Query { + qStr := q.String() + qCopy, err := ParseQuery(qStr) + if err != nil { + logger.Panicf("BUG: cannot parse %q: %s", qStr, err) + } + return qCopy +} + +// CanReturnLastNResults returns true if time range filter at q can be adjusted for returning the last N results. +func (q *Query) CanReturnLastNResults() bool { + for _, p := range q.pipes { + switch p.(type) { + case *pipeFieldNames, + *pipeFieldValues, + *pipeLimit, + *pipeOffset, + *pipeSort, + *pipeStats, + *pipeUniq: + return false + } + } + return true +} + +// GetFilterTimeRange returns filter time range for the given q. +func (q *Query) GetFilterTimeRange() (int64, int64) { + return getFilterTimeRange(q.f) +} + // AddTimeFilter adds global filter _time:[start ... end] to q. func (q *Query) AddTimeFilter(start, end int64) { startStr := marshalTimestampRFC3339NanoString(nil, start) @@ -1394,12 +1426,12 @@ func parseFilterTime(lex *lexer) (*filterTime, error) { sLower := strings.ToLower(s) if sLower == "now" || startsWithYear(s) { // Parse '_time:YYYY-MM-DD', which transforms to '_time:[YYYY-MM-DD, YYYY-MM-DD+1)' - t, err := promutils.ParseTimeAt(s, float64(lex.currentTimestamp)/1e9) + nsecs, err := promutils.ParseTimeAt(s, lex.currentTimestamp) if err != nil { return nil, fmt.Errorf("cannot parse _time filter: %w", err) } // Round to milliseconds - startTime := int64(math.Round(t*1e3)) * 1e6 + startTime := nsecs endTime := getMatchingEndTime(startTime, s) ft := &filterTime{ minTimestamp: startTime, @@ -1549,12 +1581,11 @@ func parseTime(lex *lexer) (int64, string, error) { if err != nil { return 0, "", err } - t, err := promutils.ParseTimeAt(s, float64(lex.currentTimestamp)/1e9) + nsecs, err := promutils.ParseTimeAt(s, lex.currentTimestamp) if err != nil { return 0, "", err } - // round to milliseconds - return int64(math.Round(t*1e3)) * 1e6, s, nil + return nsecs, s, nil } func quoteStringTokenIfNeeded(s string) string { diff --git a/lib/logstorage/parser_test.go b/lib/logstorage/parser_test.go index 702db4678..78ee07268 100644 --- a/lib/logstorage/parser_test.go +++ b/lib/logstorage/parser_test.go @@ -1031,7 +1031,7 @@ func TestParseQuerySuccess(t *testing.T) { sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`, `* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`) f(`* | stats count(x) if (error ip:in(_time:1d | fields ip)) rows`, `* | stats count(x) if (error ip:in(_time:1d | fields ip)) as rows`) - f(`* | stats count() if () rows`, `* | stats count(*) if () as rows`) + f(`* | stats count() if () rows`, `* | stats count(*) if (*) as rows`) // sort pipe f(`* | sort`, `* | sort`) @@ -1832,3 +1832,72 @@ func TestQueryGetNeededColumns(t *testing.T) { f(`* | unroll (a, b) | count() r1`, `a,b`, ``) f(`* | unroll if (q:w p:a) (a, b) | count() r1`, `a,b,p,q`, ``) } + +func TestQueryClone(t *testing.T) { + f := func(qStr string) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + qCopy := q.Clone() + qCopyStr := qCopy.String() + if qStr != qCopyStr { + t.Fatalf("unexpected cloned query\ngot\n%s\nwant\n%s", qCopyStr, qStr) + } + } + + f("*") + f("error") + f("_time:5m error | fields foo, bar") + f("ip:in(foo | fields user_ip) bar | stats by (x:1h, y) count(*) if (user_id:in(q:w | fields abc)) as ccc") +} + +func TestQueryGetFilterTimeRange(t *testing.T) { + f := func(qStr string, startExpected, endExpected int64) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + start, end := q.GetFilterTimeRange() + if start != startExpected || end != endExpected { + t.Fatalf("unexpected filter time range; got [%d, %d]; want [%d, %d]", start, end, startExpected, endExpected) + } + } + + f("*", -9223372036854775808, 9223372036854775807) + f("_time:2024-05-31T10:20:30.456789123Z", 1717150830456789123, 1717150830456789123) + f("_time:2024-05-31", 1717113600000000000, 1717199999999999999) +} + +func TestQueryCanReturnLastNResults(t *testing.T) { + f := func(qStr string, resultExpected bool) { + t.Helper() + + q, err := ParseQuery(qStr) + if err != nil { + t.Fatalf("cannot parse [%s]: %s", qStr, err) + } + result := q.CanReturnLastNResults() + if result != resultExpected { + t.Fatalf("unexpected result for CanRetrurnLastNResults(%q); got %v; want %v", qStr, result, resultExpected) + } + } + + f("*", true) + f("error", true) + f("error | fields foo | filter foo:bar", true) + f("error | extract 'bar'", true) + f("* | rm x", true) + f("* | stats count() rows", false) + f("* | sort by (x)", false) + f("* | limit 10", false) + f("* | offset 10", false) + f("* | uniq (x)", false) + f("* | field_names", false) + f("* | field_values x", false) + +} diff --git a/lib/logstorage/pipe.go b/lib/logstorage/pipe.go index 32a72d0dc..ff2178025 100644 --- a/lib/logstorage/pipe.go +++ b/lib/logstorage/pipe.go @@ -106,6 +106,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err) } return pd, nil + case lex.isKeyword("drop_empty_fields"): + pd, err := parsePipeDropEmptyFields(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'drop_empty_fields' pipe: %w", err) + } + return pd, nil case lex.isKeyword("extract"): pe, err := parsePipeExtract(lex) if err != nil { @@ -220,6 +226,12 @@ func parsePipe(lex *lexer) (pipe, error) { return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err) } return pu, nil + case lex.isKeyword("unpack_syslog"): + pu, err := parsePipeUnpackSyslog(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'unpack_syslog' pipe: %w", err) + } + return pu, nil case lex.isKeyword("unroll"): pu, err := parsePipeUnroll(lex) if err != nil { diff --git a/lib/logstorage/pipe_drop_empty_fields.go b/lib/logstorage/pipe_drop_empty_fields.go new file mode 100644 index 000000000..c6b1594a9 --- /dev/null +++ b/lib/logstorage/pipe_drop_empty_fields.go @@ -0,0 +1,223 @@ +package logstorage + +import ( + "fmt" + "unsafe" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" +) + +// pipeDropEmptyFields processes '| drop_empty_fields ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe +type pipeDropEmptyFields struct { +} + +func (pd *pipeDropEmptyFields) String() string { + return "drop_empty_fields" +} + +func (pd *pipeDropEmptyFields) optimize() { + // nothing to do +} + +func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool { + return false +} + +func (pd *pipeDropEmptyFields) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { + return pd, nil +} + +func (pd *pipeDropEmptyFields) updateNeededFields(_, _ fieldsSet) { + // nothing to do +} + +func (pd *pipeDropEmptyFields) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + return &pipeDropEmptyFieldsProcessor{ + ppNext: ppNext, + + shards: make([]pipeDropEmptyFieldsProcessorShard, workersCount), + } +} + +type pipeDropEmptyFieldsProcessor struct { + ppNext pipeProcessor + + shards []pipeDropEmptyFieldsProcessorShard +} + +type pipeDropEmptyFieldsProcessorShard struct { + pipeDropEmptyFieldsProcessorShardNopad + + // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . + _ [128 - unsafe.Sizeof(pipeDropEmptyFieldsProcessorShardNopad{})%128]byte +} + +type pipeDropEmptyFieldsProcessorShardNopad struct { + columnValues [][]string + fields []Field + + wctx pipeDropEmptyFieldsWriteContext +} + +func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) { + if len(br.timestamps) == 0 { + return + } + + shard := &pdp.shards[workerID] + + cs := br.getColumns() + + shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs)) + columnValues := shard.columnValues + for i, c := range cs { + columnValues[i] = c.getValues(br) + } + + if !hasEmptyValues(columnValues) { + // Fast path - just write br to ppNext, since it has no empty values. + pdp.ppNext.writeBlock(workerID, br) + return + } + + // Slow path - drop fields with empty values + shard.wctx.init(workerID, pdp.ppNext) + + fields := shard.fields + for rowIdx := range br.timestamps { + fields = fields[:0] + for i, values := range columnValues { + v := values[rowIdx] + if v == "" { + continue + } + fields = append(fields, Field{ + Name: cs[i].name, + Value: values[rowIdx], + }) + } + shard.wctx.writeRow(fields) + } + shard.fields = fields + + shard.wctx.flush() +} + +func (pdp *pipeDropEmptyFieldsProcessor) flush() error { + return nil +} + +type pipeDropEmptyFieldsWriteContext struct { + workerID uint + ppNext pipeProcessor + + rcs []resultColumn + br blockResult + + // rowsCount is the number of rows in the current block + rowsCount int + + // valuesLen is the total length of values in the current block + valuesLen int +} + +func (wctx *pipeDropEmptyFieldsWriteContext) reset() { + wctx.workerID = 0 + wctx.ppNext = nil + + rcs := wctx.rcs + for i := range rcs { + rcs[i].reset() + } + wctx.rcs = rcs[:0] + + wctx.rowsCount = 0 + wctx.valuesLen = 0 +} + +func (wctx *pipeDropEmptyFieldsWriteContext) init(workerID uint, ppNext pipeProcessor) { + wctx.reset() + + wctx.workerID = workerID + wctx.ppNext = ppNext +} + +func (wctx *pipeDropEmptyFieldsWriteContext) writeRow(fields []Field) { + if len(fields) == 0 { + // skip rows without non-empty fields + return + } + + rcs := wctx.rcs + + areEqualColumns := len(rcs) == len(fields) + if areEqualColumns { + for i, f := range fields { + if rcs[i].name != f.Name { + areEqualColumns = false + break + } + } + } + if !areEqualColumns { + // send the current block to ppNext and construct a block with new set of columns + wctx.flush() + + rcs = wctx.rcs[:0] + for _, f := range fields { + rcs = appendResultColumnWithName(rcs, f.Name) + } + wctx.rcs = rcs + } + + for i, f := range fields { + v := f.Value + rcs[i].addValue(v) + wctx.valuesLen += len(v) + } + + wctx.rowsCount++ + if wctx.valuesLen >= 1_000_000 { + wctx.flush() + } +} + +func (wctx *pipeDropEmptyFieldsWriteContext) flush() { + rcs := wctx.rcs + + wctx.valuesLen = 0 + + // Flush rcs to ppNext + br := &wctx.br + br.setResultColumns(rcs, wctx.rowsCount) + wctx.rowsCount = 0 + wctx.ppNext.writeBlock(wctx.workerID, br) + br.reset() + for i := range rcs { + rcs[i].resetValues() + } +} + +func parsePipeDropEmptyFields(lex *lexer) (*pipeDropEmptyFields, error) { + if !lex.isKeyword("drop_empty_fields") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "drop_empty_fields") + } + lex.nextToken() + + pd := &pipeDropEmptyFields{} + + return pd, nil +} + +func hasEmptyValues(columnValues [][]string) bool { + for _, values := range columnValues { + for _, v := range values { + if v == "" { + return true + } + } + } + return false +} diff --git a/lib/logstorage/pipe_drop_empty_fields_test.go b/lib/logstorage/pipe_drop_empty_fields_test.go new file mode 100644 index 000000000..fa2422f82 --- /dev/null +++ b/lib/logstorage/pipe_drop_empty_fields_test.go @@ -0,0 +1,94 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeDropEmptyFieldsSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`drop_empty_fields`) +} + +func TestParsePipeDropEmptyFieldsFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`drop_empty_fields foo`) +} + +func TestPipeDropEmptyFields(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + f(`drop_empty_fields`, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + }, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + }) + f(`drop_empty_fields`, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + { + {"a", "foo1"}, + {"b", ""}, + {"c", "baz1"}, + }, + { + {"a", ""}, + {"b", "bar2"}, + }, + { + {"a", ""}, + {"b", ""}, + {"c", ""}, + }, + }, [][]Field{ + { + {"a", "foo"}, + {"b", "bar"}, + {"c", "baz"}, + }, + { + {"a", "foo1"}, + {"c", "baz1"}, + }, + { + {"b", "bar2"}, + }, + }) +} + +func TestPipeDropEmptyFieldsUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f(`drop_empty_fields`, "*", "", "*", "") + + // non-empty unneeded fields + f(`drop_empty_fields`, "*", "f1,f2", "*", "f1,f2") + + // non-empty needed fields + f(`drop_empty_fields`, "x,y", "", "x,y", "") +} diff --git a/lib/logstorage/pipe_pack_json.go b/lib/logstorage/pipe_pack_json.go index af44c8750..0a1686f04 100644 --- a/lib/logstorage/pipe_pack_json.go +++ b/lib/logstorage/pipe_pack_json.go @@ -126,7 +126,7 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) { } bufLen := len(buf) - buf = marshalFieldsToJSON(buf, fields) + buf = MarshalFieldsToJSON(buf, fields) v := bytesutil.ToUnsafeString(buf[bufLen:]) shard.rc.addValue(v) } diff --git a/lib/logstorage/pipe_unpack.go b/lib/logstorage/pipe_unpack.go index fe2e36d78..fbc4589f1 100644 --- a/lib/logstorage/pipe_unpack.go +++ b/lib/logstorage/pipe_unpack.go @@ -57,7 +57,6 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepO } type fieldsUnpackerContext struct { - workerID uint fieldPrefix string fields []Field @@ -65,7 +64,6 @@ type fieldsUnpackerContext struct { } func (uctx *fieldsUnpackerContext) reset() { - uctx.workerID = 0 uctx.fieldPrefix = "" uctx.resetFields() uctx.a.reset() @@ -76,10 +74,9 @@ func (uctx *fieldsUnpackerContext) resetFields() { uctx.fields = uctx.fields[:0] } -func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) { +func (uctx *fieldsUnpackerContext) init(fieldPrefix string) { uctx.reset() - uctx.workerID = workerID uctx.fieldPrefix = fieldPrefix } @@ -157,7 +154,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) { shard := &pup.shards[workerID] shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br) - shard.uctx.init(workerID, pup.fieldPrefix) + shard.uctx.init(pup.fieldPrefix) bm := &shard.bm bm.init(len(br.timestamps)) @@ -234,6 +231,7 @@ func (wctx *pipeUnpackWriteContext) reset() { wctx.workerID = 0 wctx.ppNext = nil wctx.keepOriginalFields = false + wctx.skipEmptyResults = false wctx.brSrc = nil wctx.csSrc = nil diff --git a/lib/logstorage/pipe_unpack_logfmt_test.go b/lib/logstorage/pipe_unpack_logfmt_test.go index 90720118d..56099de56 100644 --- a/lib/logstorage/pipe_unpack_logfmt_test.go +++ b/lib/logstorage/pipe_unpack_logfmt_test.go @@ -191,17 +191,6 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, }) - // single row, unpack from non-json field - f("unpack_logfmt from x", [][]Field{ - { - {"x", `foobar`}, - }, - }, [][]Field{ - { - {"x", `foobar`}, - }, - }) - // single row, unpack from non-logfmt f("unpack_logfmt from x", [][]Field{ { @@ -210,6 +199,7 @@ func TestPipeUnpackLogfmt(t *testing.T) { }, [][]Field{ { {"x", `foobar`}, + {"foobar", ""}, }, }) diff --git a/lib/logstorage/pipe_unpack_syslog.go b/lib/logstorage/pipe_unpack_syslog.go new file mode 100644 index 000000000..f693739c3 --- /dev/null +++ b/lib/logstorage/pipe_unpack_syslog.go @@ -0,0 +1,146 @@ +package logstorage + +import ( + "fmt" + "sync/atomic" + "time" +) + +// pipeUnpackSyslog processes '| unpack_syslog ...' pipe. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe +type pipeUnpackSyslog struct { + // fromField is the field to unpack syslog fields from + fromField string + + // resultPrefix is prefix to add to unpacked field names + resultPrefix string + + keepOriginalFields bool + + // iff is an optional filter for skipping unpacking syslog + iff *ifFilter +} + +func (pu *pipeUnpackSyslog) String() string { + s := "unpack_syslog" + if pu.iff != nil { + s += " " + pu.iff.String() + } + if !isMsgFieldName(pu.fromField) { + s += " from " + quoteTokenIfNeeded(pu.fromField) + } + if pu.resultPrefix != "" { + s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix) + } + if pu.keepOriginalFields { + s += " keep_original_fields" + } + return s +} + +func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) { + updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields) +} + +func (pu *pipeUnpackSyslog) optimize() { + pu.iff.optimizeFilterIn() +} + +func (pu *pipeUnpackSyslog) hasFilterInWithQuery() bool { + return pu.iff.hasFilterInWithQuery() +} + +func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) { + iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc) + if err != nil { + return nil, err + } + puNew := *pu + puNew.iff = iffNew + return &puNew, nil +} + +func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { + unpackSyslog := func(uctx *fieldsUnpackerContext, s string) { + year := currentYear.Load() + p := getSyslogParser(int(year)) + + p.parse(s) + for _, f := range p.fields { + uctx.addField(f.Name, f.Value) + } + + putSyslogParser(p) + } + + return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff) +} + +var currentYear atomic.Int64 + +func init() { + year := time.Now().UTC().Year() + currentYear.Store(int64(year)) + go func() { + for { + t := time.Now().UTC() + nextYear := time.Date(t.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC) + d := nextYear.Sub(t) + time.Sleep(d) + year := time.Now().UTC().Year() + currentYear.Store(int64(year)) + } + }() +} + +func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) { + if !lex.isKeyword("unpack_syslog") { + return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_syslog") + } + lex.nextToken() + + var iff *ifFilter + if lex.isKeyword("if") { + f, err := parseIfFilter(lex) + if err != nil { + return nil, err + } + iff = f + } + + fromField := "_msg" + if lex.isKeyword("from") { + lex.nextToken() + f, err := parseFieldName(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'from' field name: %w", err) + } + fromField = f + } + + resultPrefix := "" + if lex.isKeyword("result_prefix") { + lex.nextToken() + p, err := getCompoundToken(lex) + if err != nil { + return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err) + } + resultPrefix = p + } + + keepOriginalFields := false + if lex.isKeyword("keep_original_fields") { + lex.nextToken() + keepOriginalFields = true + } + + pu := &pipeUnpackSyslog{ + fromField: fromField, + resultPrefix: resultPrefix, + keepOriginalFields: keepOriginalFields, + iff: iff, + } + + return pu, nil +} diff --git a/lib/logstorage/pipe_unpack_syslog_test.go b/lib/logstorage/pipe_unpack_syslog_test.go new file mode 100644 index 000000000..ed1a1a7d4 --- /dev/null +++ b/lib/logstorage/pipe_unpack_syslog_test.go @@ -0,0 +1,239 @@ +package logstorage + +import ( + "testing" +) + +func TestParsePipeUnpackSyslogSuccess(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeSuccess(t, pipeStr) + } + + f(`unpack_syslog`) + f(`unpack_syslog keep_original_fields`) + f(`unpack_syslog if (a:x)`) + f(`unpack_syslog if (a:x) keep_original_fields`) + f(`unpack_syslog from x`) + f(`unpack_syslog from x keep_original_fields`) + f(`unpack_syslog if (a:x) from x`) + f(`unpack_syslog from x result_prefix abc`) + f(`unpack_syslog if (a:x) from x result_prefix abc`) + f(`unpack_syslog result_prefix abc`) + f(`unpack_syslog if (a:x) result_prefix abc`) +} + +func TestParsePipeUnpackSyslogFailure(t *testing.T) { + f := func(pipeStr string) { + t.Helper() + expectParsePipeFailure(t, pipeStr) + } + + f(`unpack_syslog foo`) + f(`unpack_syslog if`) + f(`unpack_syslog if (x:y) foobar`) + f(`unpack_syslog from`) + f(`unpack_syslog from x y`) + f(`unpack_syslog from x if`) + f(`unpack_syslog from x result_prefix`) + f(`unpack_syslog from x result_prefix a b`) + f(`unpack_syslog from x result_prefix a if`) + f(`unpack_syslog result_prefix`) + f(`unpack_syslog result_prefix a b`) + f(`unpack_syslog result_prefix a if`) +} + +func TestPipeUnpackSyslog(t *testing.T) { + f := func(pipeStr string, rows, rowsExpected [][]Field) { + t.Helper() + expectPipeResults(t, pipeStr, rows, rowsExpected) + } + + // no skip empty results + f("unpack_syslog", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // keep original fields + f("unpack_syslog keep_original_fields", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"app_name", "foobar"}, + {"msg_id", "baz"}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"foo", "321"}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "foobar"}, + {"proc_id", "12345"}, + {"msg_id", "baz"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // unpack from other field + f("unpack_syslog from x", [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // failed if condition + f("unpack_syslog if (foo:bar)", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }) + + // matched if condition + f("unpack_syslog if (appname)", [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + }, [][]Field{ + { + {"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"priority", "165"}, + {"facility", "20"}, + {"severity", "5"}, + {"timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"hostname", "mymachine.example.com"}, + {"app_name", "appname"}, + {"proc_id", "12345"}, + {"msg_id", "ID47"}, + {"message", "This is a test message with structured data"}, + }, + }) + + // single row, unpack from missing field + f("unpack_syslog from x", [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }, [][]Field{ + { + {"_msg", `foo=bar`}, + }, + }) + + // single row, unpack from non-syslog field + f("unpack_syslog from x", [][]Field{ + { + {"x", `foobar`}, + }, + }, [][]Field{ + { + {"x", `foobar`}, + }, + }) + + // multiple rows with distinct number of fields + f("unpack_syslog from x result_prefix qwe_", [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + }, + { + {"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`}, + {"y", `z=bar`}, + }, + }, [][]Field{ + { + {"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`}, + {"qwe_priority", "165"}, + {"qwe_facility", "20"}, + {"qwe_severity", "5"}, + {"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"}, + {"qwe_hostname", "mymachine.example.com"}, + {"qwe_app_name", "appname"}, + {"qwe_proc_id", "12345"}, + {"qwe_msg_id", "ID47"}, + {"qwe_message", "This is a test message with structured data"}, + }, + { + {"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`}, + {"y", `z=bar`}, + {"qwe_priority", "163"}, + {"qwe_facility", "20"}, + {"qwe_severity", "3"}, + {"qwe_timestamp", "2024-12-13T18:21:43Z"}, + {"qwe_hostname", "mymachine.example.com"}, + {"qwe_app_name", "appname2"}, + {"qwe_proc_id", "345"}, + {"qwe_msg_id", "ID7"}, + {"qwe_message", "foobar"}, + }, + }) +} + +func TestPipeUnpackSyslogUpdateNeededFields(t *testing.T) { + f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { + t.Helper() + expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected) + } + + // all the needed fields + f("unpack_syslog", "*", "", "*", "") + f("unpack_syslog keep_original_fields", "*", "", "*", "") + f("unpack_syslog if (y:z) from x", "*", "", "*", "") + + // all the needed fields, unneeded fields do not intersect with src + f("unpack_syslog from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_syslog if (y:z) from x", "*", "f1,f2", "*", "f1,f2") + f("unpack_syslog if (f1:z) from x", "*", "f1,f2", "*", "f2") + + // all the needed fields, unneeded fields intersect with src + f("unpack_syslog from x", "*", "f2,x", "*", "f2") + f("unpack_syslog if (y:z) from x", "*", "f2,x", "*", "f2") + f("unpack_syslog if (f2:z) from x", "*", "f1,f2,x", "*", "f1") + + // needed fields do not intersect with src + f("unpack_syslog from x", "f1,f2", "", "f1,f2,x", "") + f("unpack_syslog if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "") + f("unpack_syslog if (f1:z) from x", "f1,f2", "", "f1,f2,x", "") + + // needed fields intersect with src + f("unpack_syslog from x", "f2,x", "", "f2,x", "") + f("unpack_syslog if (y:z) from x", "f2,x", "", "f2,x,y", "") + f("unpack_syslog if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "") +} diff --git a/lib/logstorage/rows.go b/lib/logstorage/rows.go index 192205f9c..b9e8df98e 100644 --- a/lib/logstorage/rows.go +++ b/lib/logstorage/rows.go @@ -64,7 +64,8 @@ func (f *Field) marshalToJSON(dst []byte) []byte { return dst } -func marshalFieldsToJSON(dst []byte, fields []Field) []byte { +// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result. +func MarshalFieldsToJSON(dst []byte, fields []Field) []byte { dst = append(dst, '{') if len(fields) > 0 { dst = fields[0].marshalToJSON(dst) diff --git a/lib/logstorage/stats_row_any.go b/lib/logstorage/stats_row_any.go index 6707040cc..0060189ce 100644 --- a/lib/logstorage/stats_row_any.go +++ b/lib/logstorage/stats_row_any.go @@ -99,7 +99,7 @@ func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int { func (sap *statsRowAnyProcessor) finalizeStats() string { bb := bbPool.Get() - bb.B = marshalFieldsToJSON(bb.B, sap.fields) + bb.B = MarshalFieldsToJSON(bb.B, sap.fields) result := string(bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/stats_row_max.go b/lib/logstorage/stats_row_max.go index 8f53f0f65..31c3689a9 100644 --- a/lib/logstorage/stats_row_max.go +++ b/lib/logstorage/stats_row_max.go @@ -206,7 +206,7 @@ func (smp *statsRowMaxProcessor) updateState(v string, br *blockResult, rowIdx i func (smp *statsRowMaxProcessor) finalizeStats() string { bb := bbPool.Get() - bb.B = marshalFieldsToJSON(bb.B, smp.fields) + bb.B = MarshalFieldsToJSON(bb.B, smp.fields) result := string(bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/stats_row_min.go b/lib/logstorage/stats_row_min.go index 9aa69681a..66415dd90 100644 --- a/lib/logstorage/stats_row_min.go +++ b/lib/logstorage/stats_row_min.go @@ -206,7 +206,7 @@ func (smp *statsRowMinProcessor) updateState(v string, br *blockResult, rowIdx i func (smp *statsRowMinProcessor) finalizeStats() string { bb := bbPool.Get() - bb.B = marshalFieldsToJSON(bb.B, smp.fields) + bb.B = MarshalFieldsToJSON(bb.B, smp.fields) result := string(bb.B) bbPool.Put(bb) diff --git a/lib/logstorage/storage_search_test.go b/lib/logstorage/storage_search_test.go index 488c46c48..9f5bde5af 100644 --- a/lib/logstorage/storage_search_test.go +++ b/lib/logstorage/storage_search_test.go @@ -969,7 +969,7 @@ func TestParseStreamFieldsSuccess(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %s", err) } - result := marshalFieldsToJSON(nil, labels) + result := MarshalFieldsToJSON(nil, labels) if string(result) != resultExpected { t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected) } diff --git a/lib/logstorage/syslog_parser.go b/lib/logstorage/syslog_parser.go new file mode 100644 index 000000000..6bf6f0082 --- /dev/null +++ b/lib/logstorage/syslog_parser.go @@ -0,0 +1,308 @@ +package logstorage + +import ( + "strconv" + "strings" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" +) + +func getSyslogParser(currentYear int) *syslogParser { + v := syslogParserPool.Get() + if v == nil { + v = &syslogParser{} + } + p := v.(*syslogParser) + p.currentYear = currentYear + return p +} + +func putSyslogParser(p *syslogParser) { + p.reset() + syslogParserPool.Put(p) +} + +var syslogParserPool sync.Pool + +type syslogParser struct { + currentYear int + + buf []byte + fields []Field +} + +func (p *syslogParser) reset() { + p.currentYear = 0 + p.resetFields() +} + +func (p *syslogParser) resetFields() { + p.buf = p.buf[:0] + + clear(p.fields) + p.fields = p.fields[:0] +} + +func (p *syslogParser) addField(name, value string) { + p.fields = append(p.fields, Field{ + Name: name, + Value: value, + }) +} + +func (p *syslogParser) parse(s string) { + p.resetFields() + + if len(s) == 0 { + // Cannot parse syslog message + return + } + + if s[0] != '<' { + p.parseNoHeader(s) + return + } + + // parse priority + s = s[1:] + n := strings.IndexByte(s, '>') + if n < 0 { + // Cannot parse priority + return + } + priorityStr := s[:n] + s = s[n+1:] + + p.addField("priority", priorityStr) + priority, ok := tryParseUint64(priorityStr) + if !ok { + // Cannot parse priority + return + } + facility := priority / 8 + severity := priority % 8 + + bufLen := len(p.buf) + p.buf = marshalUint64String(p.buf, facility) + p.addField("facility", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + bufLen = len(p.buf) + p.buf = marshalUint64String(p.buf, severity) + p.addField("severity", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + p.parseNoHeader(s) +} + +func (p *syslogParser) parseNoHeader(s string) { + if len(s) == 0 { + return + } + if strings.HasPrefix(s, "1 ") { + p.parseRFC5424(s[2:]) + } else { + p.parseRFC3164(s) + } +} + +func (p *syslogParser) parseRFC5424(s string) { + // See https://datatracker.ietf.org/doc/html/rfc5424 + + if len(s) == 0 { + return + } + + // Parse timestamp + n := strings.IndexByte(s, ' ') + if n < 0 { + p.addField("timestamp", s) + return + } + p.addField("timestamp", s[:n]) + s = s[n+1:] + + // Parse hostname + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("hostname", s) + return + } + p.addField("hostname", s[:n]) + s = s[n+1:] + + // Parse app-name + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("app_name", s) + return + } + p.addField("app_name", s[:n]) + s = s[n+1:] + + // Parse procid + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("proc_id", s) + return + } + p.addField("proc_id", s[:n]) + s = s[n+1:] + + // Parse msgID + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("msg_id", s) + return + } + p.addField("msg_id", s[:n]) + s = s[n+1:] + + // Parse structured data + tail, ok := p.parseRFC5424SD(s) + if !ok { + return + } + s = tail + + // Parse message + p.addField("message", s) +} + +func (p *syslogParser) parseRFC5424SD(s string) (string, bool) { + if strings.HasPrefix(s, "- ") { + return s[2:], true + } + + for { + tail, ok := p.parseRFC5424SDLine(s) + if !ok { + return tail, false + } + s = tail + if strings.HasPrefix(s, " ") { + s = s[1:] + return s, true + } + } +} + +func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) { + if len(s) == 0 || s[0] != '[' { + return s, false + } + s = s[1:] + + n := strings.IndexAny(s, " ]") + if n < 0 { + return s, false + } + sdID := s[:n] + s = s[n:] + + // Parse structured data + i := 0 + for i < len(s) && s[i] != ']' { + // skip whitespace + if s[i] != ' ' { + return s, false + } + i++ + + // Parse name + n := strings.IndexByte(s[i:], '=') + if n < 0 { + return s, false + } + i += n + 1 + + // Parse value + qp, err := strconv.QuotedPrefix(s[i:]) + if err != nil { + return s, false + } + i += len(qp) + } + if i == len(s) { + return s, false + } + + sdValue := strings.TrimSpace(s[:i]) + p.addField(sdID, sdValue) + s = s[i+1:] + return s, true +} + +func (p *syslogParser) parseRFC3164(s string) { + // See https://datatracker.ietf.org/doc/html/rfc3164 + + // Parse timestamp + n := len(time.Stamp) + if len(s) < n { + return + } + t, err := time.Parse(time.Stamp, s[:n]) + if err != nil { + // TODO: fall back to parsing ISO8601 timestamp? + return + } + s = s[n:] + + t = t.UTC() + t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() { + // Adjust time to the previous year + t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC) + } + + bufLen := len(p.buf) + p.buf = marshalTimestampISO8601String(p.buf, t.UnixNano()) + p.addField("timestamp", bytesutil.ToUnsafeString(p.buf[bufLen:])) + + if len(s) == 0 || s[0] != ' ' { + // Missing space after the time field + return + } + s = s[1:] + + // Parse hostname + n = strings.IndexByte(s, ' ') + if n < 0 { + p.addField("hostname", s) + return + } + p.addField("hostname", s[:n]) + s = s[n+1:] + + // Parse tag (aka app_name) + n = strings.IndexAny(s, "[: ") + if n < 0 { + p.addField("app_name", s) + return + } + p.addField("app_name", s[:n]) + s = s[n:] + + // Parse proc_id + if len(s) == 0 { + return + } + if s[0] == '[' { + s = s[1:] + n = strings.IndexByte(s, ']') + if n < 0 { + return + } + p.addField("proc_id", s[:n]) + s = s[n+1:] + } + + // Skip optional ': ' in front of message + s = strings.TrimPrefix(s, ":") + s = strings.TrimPrefix(s, " ") + + if len(s) > 0 { + p.addField("message", s) + } +} diff --git a/lib/logstorage/syslog_parser_test.go b/lib/logstorage/syslog_parser_test.go new file mode 100644 index 000000000..1a61a1fe6 --- /dev/null +++ b/lib/logstorage/syslog_parser_test.go @@ -0,0 +1,67 @@ +package logstorage + +import ( + "testing" +) + +func TestSyslogParser(t *testing.T) { + f := func(s, resultExpected string) { + t.Helper() + + const currentYear = 2024 + p := getSyslogParser(currentYear) + defer putSyslogParser(p) + + p.parse(s) + result := MarshalFieldsToJSON(nil, p.fields) + if string(result) != resultExpected { + t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected) + } + } + + // RFC 3164 + f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...", + `{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`) + f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...", + `{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`) + f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`) + f("Jun 3 12:08:33 - - Starting Update the local ESM caches...", + `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`) + + // RFC 5424 + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`, + `{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`) + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`) + f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`) + + // Incomplete RFC 3164 + f("", `{}`) + f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`) + f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`) + f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`) + f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`) + f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`) + + // Incomplete RFC 5424 + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`) + f(`<165>1 2023-06-03T17:42:32.123456789Z`, + `{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`) + f(`<165>1 `, + `{"priority":"165","facility":"20","severity":"5"}`) +} diff --git a/lib/logstorage/values_encoder.go b/lib/logstorage/values_encoder.go index 56ac11381..f1a909279 100644 --- a/lib/logstorage/values_encoder.go +++ b/lib/logstorage/values_encoder.go @@ -860,8 +860,8 @@ func tryParseDuration(s string) (int64, bool) { return nsecs, true } -// marshalDuration appends string representation of nsec duration to dst and returns the result. -func marshalDuration(dst []byte, nsecs int64) []byte { +// marshalDurationString appends string representation of nsec duration to dst and returns the result. +func marshalDurationString(dst []byte, nsecs int64) []byte { if nsecs == 0 { return append(dst, '0') } diff --git a/lib/logstorage/values_encoder_test.go b/lib/logstorage/values_encoder_test.go index f6d2d26da..369f12b06 100644 --- a/lib/logstorage/values_encoder_test.go +++ b/lib/logstorage/values_encoder_test.go @@ -378,11 +378,11 @@ func TestTryParseDuration_Failure(t *testing.T) { f("2s 3ms") } -func TestMarshalDuration(t *testing.T) { +func TestMarshalDurationString(t *testing.T) { f := func(nsecs int64, resultExpected string) { t.Helper() - result := marshalDuration(nil, nsecs) + result := marshalDurationString(nil, nsecs) if string(result) != resultExpected { t.Fatalf("unexpected result; got %q; want %q", result, resultExpected) } diff --git a/lib/promutils/time.go b/lib/promutils/time.go index 55e9700a0..b2ac05dc0 100644 --- a/lib/promutils/time.go +++ b/lib/promutils/time.go @@ -14,12 +14,12 @@ import ( // // It returns unix timestamp in milliseconds. func ParseTimeMsec(s string) (int64, error) { - currentTimestamp := float64(time.Now().UnixNano()) / 1e9 - secs, err := ParseTimeAt(s, currentTimestamp) + currentTimestamp := time.Now().UnixNano() + nsecs, err := ParseTimeAt(s, currentTimestamp) if err != nil { return 0, err } - msecs := int64(math.Round(secs * 1000)) + msecs := int64(math.Round(float64(nsecs) / 1e6)) return msecs, nil } @@ -33,13 +33,13 @@ const ( // // See https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats // -// It returns unix timestamp in seconds. -func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { +// It returns unix timestamp in nanoseconds. +func ParseTimeAt(s string, currentTimestamp int64) (int64, error) { if s == "now" { return currentTimestamp, nil } sOrig := s - tzOffset := float64(0) + tzOffset := int64(0) if len(sOrig) > 6 { // Try parsing timezone offset tz := sOrig[len(sOrig)-6:] @@ -53,7 +53,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, fmt.Errorf("cannot parse minute from timezone offset %q: %w", tz, err) } - tzOffset = float64(hour*3600 + minute*60) + tzOffset = int64(hour*3600+minute*60) * 1e9 if isPlus { tzOffset = -tzOffset } @@ -71,7 +71,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if d > 0 { d = -d } - return currentTimestamp + float64(d)/1e9, nil + return currentTimestamp + int64(d), nil } if len(s) == 4 { // Parse YYYY @@ -83,7 +83,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if y > maxValidYear || y < minValidYear { return 0, fmt.Errorf("cannot parse year from %q: year must in range [%d, %d]", s, minValidYear, maxValidYear) } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } if !strings.Contains(sOrig, "-") { // Parse the timestamp in seconds or in milliseconds @@ -95,7 +95,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { // The timestamp is in milliseconds. Convert it to seconds. ts /= 1000 } - return ts, nil + return int64(math.Round(ts*1e3)) * 1e6, nil } if len(s) == 7 { // Parse YYYY-MM @@ -103,7 +103,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, err } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } if len(s) == 10 { // Parse YYYY-MM-DD @@ -111,7 +111,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, err } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } if len(s) == 13 { // Parse YYYY-MM-DDTHH @@ -119,7 +119,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, err } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } if len(s) == 16 { // Parse YYYY-MM-DDTHH:MM @@ -127,7 +127,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, err } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } if len(s) == 19 { // Parse YYYY-MM-DDTHH:MM:SS @@ -135,12 +135,12 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) { if err != nil { return 0, err } - return tzOffset + float64(t.UnixNano())/1e9, nil + return tzOffset + t.UnixNano(), nil } // Parse RFC3339 t, err := time.Parse(time.RFC3339, sOrig) if err != nil { return 0, err } - return float64(t.UnixNano()) / 1e9, nil + return t.UnixNano(), nil } diff --git a/lib/promutils/time_test.go b/lib/promutils/time_test.go index 9ce9c9a6c..2f9e76b52 100644 --- a/lib/promutils/time_test.go +++ b/lib/promutils/time_test.go @@ -6,7 +6,7 @@ import ( ) func TestParseTimeAtSuccess(t *testing.T) { - f := func(s string, currentTime, resultExpected float64) { + f := func(s string, currentTime, resultExpected int64) { t.Helper() result, err := ParseTimeAt(s, currentTime) if err != nil { @@ -17,65 +17,65 @@ func TestParseTimeAtSuccess(t *testing.T) { } } - now := float64(time.Now().UnixNano()) / 1e9 + now := time.Now().UnixNano() // unix timestamp in seconds - f("1562529662", now, 1562529662) - f("1562529662.678", now, 1562529662.678) + f("1562529662", now, 1562529662*1e9) + f("1562529662.678", now, 1562529662678*1e6) // unix timestamp in milliseconds - f("1562529662678", now, 1562529662.678) + f("1562529662678", now, 1562529662678*1e6) // duration relative to the current time f("now", now, now) - f("1h5s", now, now-3605) + f("1h5s", now, now-3605*1e9) // negative duration relative to the current time - f("-5m", now, now-5*60) - f("-123", now, now-123) - f("-123.456", now, now-123.456) - f("now-1h5m", now, now-(3600+5*60)) + f("-5m", now, now-5*60*1e9) + f("-123", now, now-123*1e9) + f("-123.456", now, now-123456*1e6) + f("now-1h5m", now, now-(3600+5*60)*1e9) // Year - f("2023", now, 1.6725312e+09) - f("2023Z", now, 1.6725312e+09) - f("2023+02:00", now, 1.672524e+09) - f("2023-02:00", now, 1.6725384e+09) + f("2023", now, 1.6725312e+09*1e9) + f("2023Z", now, 1.6725312e+09*1e9) + f("2023+02:00", now, 1.672524e+09*1e9) + f("2023-02:00", now, 1.6725384e+09*1e9) // Year and month - f("2023-05", now, 1.6828992e+09) - f("2023-05Z", now, 1.6828992e+09) - f("2023-05+02:00", now, 1.682892e+09) - f("2023-05-02:00", now, 1.6829064e+09) + f("2023-05", now, 1.6828992e+09*1e9) + f("2023-05Z", now, 1.6828992e+09*1e9) + f("2023-05+02:00", now, 1.682892e+09*1e9) + f("2023-05-02:00", now, 1.6829064e+09*1e9) // Year, month and day - f("2023-05-20", now, 1.6845408e+09) - f("2023-05-20Z", now, 1.6845408e+09) - f("2023-05-20+02:30", now, 1.6845318e+09) - f("2023-05-20-02:30", now, 1.6845498e+09) + f("2023-05-20", now, 1.6845408e+09*1e9) + f("2023-05-20Z", now, 1.6845408e+09*1e9) + f("2023-05-20+02:30", now, 1.6845318e+09*1e9) + f("2023-05-20-02:30", now, 1.6845498e+09*1e9) // Year, month, day and hour - f("2023-05-20T04", now, 1.6845552e+09) - f("2023-05-20T04Z", now, 1.6845552e+09) - f("2023-05-20T04+02:30", now, 1.6845462e+09) - f("2023-05-20T04-02:30", now, 1.6845642e+09) + f("2023-05-20T04", now, 1.6845552e+09*1e9) + f("2023-05-20T04Z", now, 1.6845552e+09*1e9) + f("2023-05-20T04+02:30", now, 1.6845462e+09*1e9) + f("2023-05-20T04-02:30", now, 1.6845642e+09*1e9) // Year, month, day, hour and minute - f("2023-05-20T04:57", now, 1.68455862e+09) - f("2023-05-20T04:57Z", now, 1.68455862e+09) - f("2023-05-20T04:57+02:30", now, 1.68454962e+09) - f("2023-05-20T04:57-02:30", now, 1.68456762e+09) + f("2023-05-20T04:57", now, 1.68455862e+09*1e9) + f("2023-05-20T04:57Z", now, 1.68455862e+09*1e9) + f("2023-05-20T04:57+02:30", now, 1.68454962e+09*1e9) + f("2023-05-20T04:57-02:30", now, 1.68456762e+09*1e9) // Year, month, day, hour, minute and second - f("2023-05-20T04:57:43", now, 1.684558663e+09) - f("2023-05-20T04:57:43Z", now, 1.684558663e+09) - f("2023-05-20T04:57:43+02:30", now, 1.684549663e+09) - f("2023-05-20T04:57:43-02:30", now, 1.684567663e+09) + f("2023-05-20T04:57:43", now, 1.684558663e+09*1e9) + f("2023-05-20T04:57:43Z", now, 1.684558663e+09*1e9) + f("2023-05-20T04:57:43+02:30", now, 1.684549663e+09*1e9) + f("2023-05-20T04:57:43-02:30", now, 1.684567663e+09*1e9) // milliseconds - f("2023-05-20T04:57:43.123Z", now, 1.6845586631230001e+09) - f("2023-05-20T04:57:43.123456789+02:30", now, 1.6845496631234567e+09) - f("2023-05-20T04:57:43.123456789-02:30", now, 1.6845676631234567e+09) + f("2023-05-20T04:57:43.123Z", now, 1684558663123000000) + f("2023-05-20T04:57:43.123456789+02:30", now, 1684549663123456789) + f("2023-05-20T04:57:43.123456789-02:30", now, 1684567663123456789) } func TestParseTimeMsecFailure(t *testing.T) {