mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: work-in-progress
This commit is contained in:
parent
a5f81f67fd
commit
539fce9227
42 changed files with 1954 additions and 152 deletions
|
@ -298,11 +298,36 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
|||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
if limit > 0 {
|
||||
q.AddPipeLimit(uint64(limit))
|
||||
}
|
||||
|
||||
bw := getBufferedWriter(w)
|
||||
defer func() {
|
||||
bw.FlushIgnoreErrors()
|
||||
putBufferedWriter(bw)
|
||||
}()
|
||||
w.Header().Set("Content-Type", "application/stream+json")
|
||||
|
||||
if limit > 0 {
|
||||
if q.CanReturnLastNResults() {
|
||||
rows, err := getLastNQueryResults(ctx, tenantIDs, q, limit)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
bb := blockResultPool.Get()
|
||||
b := bb.B
|
||||
for i := range rows {
|
||||
b = logstorage.MarshalFieldsToJSON(b[:0], rows[i].fields)
|
||||
b = append(b, '\n')
|
||||
bw.WriteIgnoreErrors(b)
|
||||
}
|
||||
bb.B = b
|
||||
blockResultPool.Put(bb)
|
||||
return
|
||||
}
|
||||
|
||||
q.AddPipeLimit(uint64(limit))
|
||||
q.Optimize()
|
||||
}
|
||||
|
||||
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
||||
if len(columns) == 0 || len(columns[0].Values) == 0 {
|
||||
|
@ -317,20 +342,103 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
|||
blockResultPool.Put(bb)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/stream+json")
|
||||
q.Optimize()
|
||||
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
|
||||
|
||||
bw.FlushIgnoreErrors()
|
||||
putBufferedWriter(bw)
|
||||
|
||||
if err != nil {
|
||||
if err := vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock); err != nil {
|
||||
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", q, err)
|
||||
}
|
||||
}
|
||||
|
||||
var blockResultPool bytesutil.ByteBufferPool
|
||||
|
||||
type row struct {
|
||||
timestamp int64
|
||||
fields []logstorage.Field
|
||||
}
|
||||
|
||||
func getLastNQueryResults(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
|
||||
q.AddPipeLimit(uint64(limit + 1))
|
||||
q.Optimize()
|
||||
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(rows) <= limit {
|
||||
// Fast path - the requested time range contains up to limit rows.
|
||||
sortRowsByTime(rows)
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
// Slow path - search for the time range with the requested limit rows.
|
||||
start, end := q.GetFilterTimeRange()
|
||||
d := end/2 - start/2
|
||||
start += d
|
||||
|
||||
qOrig := q
|
||||
for {
|
||||
q = qOrig.Clone()
|
||||
q.AddTimeFilter(start, end)
|
||||
rows, err := getQueryResultsWithLimit(ctx, tenantIDs, q, limit+1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(rows) == limit || len(rows) > limit && d < 10e6 || d == 0 {
|
||||
sortRowsByTime(rows)
|
||||
if len(rows) > limit {
|
||||
rows = rows[len(rows)-limit:]
|
||||
}
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
lastBit := d & 1
|
||||
d /= 2
|
||||
if len(rows) > limit {
|
||||
start += d
|
||||
} else {
|
||||
start -= d + lastBit
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func sortRowsByTime(rows []row) {
|
||||
sort.Slice(rows, func(i, j int) bool {
|
||||
return rows[i].timestamp < rows[j].timestamp
|
||||
})
|
||||
}
|
||||
|
||||
func getQueryResultsWithLimit(ctx context.Context, tenantIDs []logstorage.TenantID, q *logstorage.Query, limit int) ([]row, error) {
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
var rows []row
|
||||
var rowsLock sync.Mutex
|
||||
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
||||
rowsLock.Lock()
|
||||
defer rowsLock.Unlock()
|
||||
|
||||
for i, timestamp := range timestamps {
|
||||
fields := make([]logstorage.Field, len(columns))
|
||||
for j := range columns {
|
||||
f := &fields[j]
|
||||
f.Name = strings.Clone(columns[j].Name)
|
||||
f.Value = strings.Clone(columns[j].Values[i])
|
||||
}
|
||||
rows = append(rows, row{
|
||||
timestamp: timestamp,
|
||||
fields: fields,
|
||||
})
|
||||
}
|
||||
|
||||
if len(rows) >= limit {
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
if err := vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return rows, nil
|
||||
}
|
||||
|
||||
func parseCommonArgs(r *http.Request) (*logstorage.Query, []logstorage.TenantID, error) {
|
||||
// Extract tenantID
|
||||
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
||||
|
@ -373,10 +481,10 @@ func getTimeNsec(r *http.Request, argName string) (int64, bool, error) {
|
|||
if s == "" {
|
||||
return 0, false, nil
|
||||
}
|
||||
currentTimestamp := float64(time.Now().UnixNano()) / 1e9
|
||||
secs, err := promutils.ParseTimeAt(s, currentTimestamp)
|
||||
currentTimestamp := time.Now().UnixNano()
|
||||
nsecs, err := promutils.ParseTimeAt(s, currentTimestamp)
|
||||
if err != nil {
|
||||
return 0, false, fmt.Errorf("cannot parse %s=%s: %w", argName, s, err)
|
||||
}
|
||||
return int64(secs * 1e9), true, nil
|
||||
return nsecs, true, nil
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ services:
|
|||
# storing logs and serving read queries.
|
||||
victorialogs:
|
||||
container_name: victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
command:
|
||||
- "--storageDataPath=/vlogs"
|
||||
- "--httpListenAddr=:9428"
|
||||
|
|
|
@ -22,7 +22,7 @@ services:
|
|||
- -beat.uri=http://filebeat-victorialogs:5066
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-filebeat-docker-vl:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -13,7 +13,7 @@ services:
|
|||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-filebeat-syslog-vl:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -11,7 +11,7 @@ services:
|
|||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-fluentbit-vl:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -14,7 +14,7 @@ services:
|
|||
- "5140:5140"
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-logstash-vl:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -12,7 +12,7 @@ services:
|
|||
- "5140:5140"
|
||||
|
||||
vlogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-promtail-docker:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -22,7 +22,7 @@ services:
|
|||
condition: service_healthy
|
||||
|
||||
victorialogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- victorialogs-vector-docker-vl:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -3,7 +3,7 @@ version: '3'
|
|||
services:
|
||||
# Run `make package-victoria-logs` to build victoria-logs image
|
||||
vlogs:
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
image: docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
volumes:
|
||||
- vlogs:/vlogs
|
||||
ports:
|
||||
|
|
|
@ -19,6 +19,15 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
|
||||
## tip
|
||||
|
||||
## [v0.16.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.16.0-victorialogs)
|
||||
|
||||
Released at 2024-06-04
|
||||
|
||||
* FEATURE: add [`unpack_syslog` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe) for unpacking [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
* FEATURE: parse timestamps in [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) with nanosecond precision.
|
||||
* FEATURE: return the last `N` matching logs from [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs) with the maximum timestamps if `limit=N` query arg is passed to it. Previously a random subset of matching logs could be returned, which could complicate investigation of the returned logs.
|
||||
* FEATURE: add [`drop_empty_fields` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe) for dropping [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.
|
||||
|
||||
## [v0.15.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.15.0-victorialogs)
|
||||
|
||||
Released at 2024-05-30
|
||||
|
|
|
@ -13,7 +13,10 @@ aliases:
|
|||
# LogsQL
|
||||
|
||||
LogsQL is a simple yet powerful query language for [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/).
|
||||
It provides the following features:
|
||||
See [examples](https://docs.victoriametrics.com/victorialogs/logsql-examples/) and [tutorial](#logsql-tutorial)
|
||||
in order to feel the language.
|
||||
|
||||
LogsQL provides the following features:
|
||||
|
||||
- Full-text search across [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
See [word filter](#word-filter), [phrase filter](#phrase-filter) and [prefix filter](#prefix-filter).
|
||||
|
@ -194,7 +197,8 @@ _time:5m error | stats count() logs_with_error
|
|||
|
||||
Finally, it is recommended reading [performance tips](#performance-tips).
|
||||
|
||||
Now you are familiar with LogsQL basics. Read [query syntax](#query-syntax) if you want to continue learning LogsQL.
|
||||
Now you are familiar with LogsQL basics. See [LogsQL examples](https://docs.victoriametrics.com/victorialogs/logsql-examples/) and [query syntax](#query-syntax)
|
||||
if you want to continue learning LogsQL.
|
||||
|
||||
### Key concepts
|
||||
|
||||
|
@ -287,7 +291,7 @@ _time:1h AND error
|
|||
|
||||
The following formats are supported for `_time` filter:
|
||||
|
||||
- `_time:duration` matches logs with timestamps on the time range `(now-duration, now]`. Examples:
|
||||
- `_time:duration` matches logs with timestamps on the time range `(now-duration, now]`, where `duration` can have [these values](#duration-values). Examples:
|
||||
- `_time:5m` - returns logs for the last 5 minutes
|
||||
- `_time:2.5d15m42.345s` - returns logs for the last 2.5 days, 15 minutes and 42.345 seconds
|
||||
- `_time:1y` - returns logs for the last year
|
||||
|
@ -1151,6 +1155,7 @@ LogsQL supports the following pipes:
|
|||
|
||||
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`drop_empty_fields`](#drop_empty_fields-pipe) drops [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values.
|
||||
- [`extract`](#extract-pipe) extracts the specified text into the given log fields.
|
||||
- [`extract_regexp`](#extract_regexp-pipe) extracts the specified text into the given log fields via [RE2 regular expressions](https://github.com/google/re2/wiki/Syntax).
|
||||
- [`field_names`](#field_names-pipe) returns all the names of [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
@ -1168,8 +1173,9 @@ LogsQL supports the following pipes:
|
|||
- [`sort`](#sort-pipe) sorts logs by the given [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`stats`](#stats-pipe) calculates various stats over the selected logs.
|
||||
- [`uniq`](#uniq-pipe) returns unique log entires.
|
||||
- [`unpack_json`](#unpack_json-pipe) unpacks JSON fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) fields from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unpack_json`](#unpack_json-pipe) unpacks JSON messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unpack_logfmt`](#unpack_logfmt-pipe) unpacks [logfmt](https://brandur.org/logfmt) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unpack_syslog`](#unpack_syslog-pipe) [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
||||
### copy pipe
|
||||
|
@ -1215,6 +1221,22 @@ See also:
|
|||
- [`rename` pipe](#rename-pipe)
|
||||
- [`fields` pipe](#fields-pipe)
|
||||
|
||||
### drop_empty_fields pipe
|
||||
|
||||
`| drop_empty_fields` pipe drops [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with empty values. It also skips log entries with zero non-empty fields.
|
||||
|
||||
For example, the following query drops possible empty `email` field generated by [`extract` pipe](#extract-pipe) if the `foo` field doesn't contain email:
|
||||
|
||||
```logsql
|
||||
_time:5m | extract 'email: <email>,' from foo | drop_empty_fields
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [`filter` pipe](#filter-pipe)
|
||||
- [`extract` pipe](#extract-pipe)
|
||||
|
||||
|
||||
### extract pipe
|
||||
|
||||
`| extract "pattern" from field_name` [pipe](#pipes) allows extracting arbitrary text into output fields according to the [`pattern`](#format-for-extract-pipe-pattern) from the given
|
||||
|
@ -1291,6 +1313,7 @@ the corresponding matching substring to.
|
|||
Matching starts from the first occurrence of the `text1` in the input text. If the `pattern` starts with `<field1>` and doesn't contain `text1`,
|
||||
then the matching starts from the beginning of the input text. Matching is performed sequentially according to the `pattern`. If some `textX` isn't found
|
||||
in the remaining input text, then the remaining named placeholders receive empty string values and the matching finishes prematurely.
|
||||
The empty string values can be dropped with [`drop_empty_fields` pipe](#drop_empty_fields-pipe).
|
||||
|
||||
Matching finishes successfully when `textN+1` is found in the input text.
|
||||
If the `pattern` ends with `<fieldN>` and doesn't contain `textN+1`, then the `<fieldN>` matches the remaining input text.
|
||||
|
@ -2105,7 +2128,7 @@ See also:
|
|||
|
||||
### unpack_json pipe
|
||||
|
||||
`| unpack_json from field_name` pipe unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
`| unpack_json from field_name` [pipe](#pipes) unpacks `{"k1":"v1", ..., "kN":"vN"}` JSON from the given input [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
into `k1`, ... `kN` output field names with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
|
||||
|
||||
Nested JSON is unpacked according to the rules defined [here](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
@ -2172,6 +2195,7 @@ See also:
|
|||
|
||||
- [Conditional `unpack_json`](#conditional-unpack_json)
|
||||
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
|
||||
- [`unpack_syslog` pipe](#unpack_syslog-pipe)
|
||||
- [`extract` pipe](#extract-pipe)
|
||||
- [`unroll` pipe](#unroll-pipe)
|
||||
- [`pack_json` pipe](#pack_json-pipe)
|
||||
|
@ -2188,7 +2212,7 @@ _time:5m | unpack_json if (ip:"") from foo
|
|||
|
||||
### unpack_logfmt pipe
|
||||
|
||||
`| unpack_logfmt from field_name` pipe unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields
|
||||
`| unpack_logfmt from field_name` [pipe](#pipes) unpacks `k1=v1 ... kN=vN` [logfmt](https://brandur.org/logfmt) fields
|
||||
from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) into `k1`, ... `kN` field names
|
||||
with the corresponding `v1`, ..., `vN` values. It overrides existing fields with names from the `k1`, ..., `kN` list. Other fields remain untouched.
|
||||
|
||||
|
@ -2235,7 +2259,7 @@ in [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#mes
|
|||
_time:5m | extract ' ip=<ip>'
|
||||
```
|
||||
|
||||
If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from JSON,
|
||||
If you want to make sure that the unpacked [logfmt](https://brandur.org/logfmt) fields do not clash with the existing fields, then specify common prefix for all the fields extracted from logfmt,
|
||||
by adding `result_prefix "prefix_name"` to `unpack_logfmt`. For example, the following query adds `foo_` prefix for all the unpacked fields
|
||||
from `foo` field:
|
||||
|
||||
|
@ -2256,6 +2280,7 @@ See also:
|
|||
|
||||
- [Conditional unpack_logfmt](#conditional-unpack_logfmt)
|
||||
- [`unpack_json` pipe](#unpack_json-pipe)
|
||||
- [`unpack_syslog` pipe](#unpack_syslog-pipe)
|
||||
- [`extract` pipe](#extract-pipe)
|
||||
|
||||
#### Conditional unpack_logfmt
|
||||
|
@ -2269,6 +2294,86 @@ only if `ip` field in the current log entry isn't set or empty:
|
|||
_time:5m | unpack_logfmt if (ip:"") from foo
|
||||
```
|
||||
|
||||
### unpack_syslog pipe
|
||||
|
||||
`| unpack_syslog from field_name` [pipe](#pipes) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message
|
||||
from the given [`field_name`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). It understands the following Syslog formats:
|
||||
|
||||
- [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164) aka `<PRI>MMM DD hh:mm:ss HOSTNAME TAG: MESSAGE`
|
||||
- [RFC5424](https://datatracker.ietf.org/doc/html/rfc5424) aka `<PRI>VERSION TIMESTAMP HOSTNAME APP-NAME PROCID MSGID [STRUCTURED-DATA] MESSAGE`
|
||||
|
||||
The following fields are unpacked:
|
||||
|
||||
- `priority` - it is obtained from `PRI`.
|
||||
- `facility` - it is calculated as `PRI / 8`.
|
||||
- `severity` - it is calculated as `PRI % 8`.
|
||||
- `timestamp` - timestamp in [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601). The `MMM DD hh:mm:ss` timestamp in [RFC3164](https://datatracker.ietf.org/doc/html/rfc3164)
|
||||
is automatically converted into [ISO8601 format](https://en.wikipedia.org/wiki/ISO_8601) by assuming that the timestamp belongs to the last 12 months.
|
||||
- `hostname`
|
||||
- `app_name`
|
||||
- `proc_id`
|
||||
- `msg_id`
|
||||
- `message`
|
||||
|
||||
The `[STRUCTURED-DATA]` is parsed into fields with the `SD-ID` name and `param1="value1" ... paramN="valueN"` value
|
||||
according to [the specification](https://datatracker.ietf.org/doc/html/rfc5424#section-6.3). The value then can be parsed to separate fields with [`unpack_logfmt` pipe](#unpack_logfmt-pipe).
|
||||
|
||||
For example, the following query unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) message from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
|
||||
across logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_syslog from _msg
|
||||
```
|
||||
|
||||
The `from _json` part can be omitted when [syslog](https://en.wikipedia.org/wiki/Syslog) message is unpacked
|
||||
from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
|
||||
The following query is equivalent to the previous one:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_syslog
|
||||
```
|
||||
|
||||
If it is needed to preserve the original non-empty field values, then add `keep_original_fields` to the end of `unpack_syslog ...`:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_syslog keep_original_fields
|
||||
```
|
||||
|
||||
If you want to make sure that the unpacked [syslog](https://en.wikipedia.org/wiki/Syslog) fields do not clash with the existing fields,
|
||||
then specify common prefix for all the fields extracted from syslog, by adding `result_prefix "prefix_name"` to `unpack_syslog`.
|
||||
For example, the following query adds `foo_` prefix for all the unpacked fields from `foo` field:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_syslog from foo result_prefix "foo_"
|
||||
```
|
||||
|
||||
Performance tips:
|
||||
|
||||
- It is better from performance and resource usage PoV ingesting parsed [syslog](https://en.wikipedia.org/wiki/Syslog) messages into VictoriaLogs
|
||||
according to the [supported data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model)
|
||||
instead of ingesting unparsed syslog lines into VictoriaLogs and then parsing them at query time with [`unpack_syslog` pipe](#unpack_syslog-pipe).
|
||||
|
||||
- It is recommended using more specific [log filters](#filters) in order to reduce the number of log entries, which are passed to `unpack_syslog`.
|
||||
See [general performance tips](#performance-tips) for details.
|
||||
|
||||
See also:
|
||||
|
||||
- [Conditional unpack_syslog](#conditional-unpack_syslog)
|
||||
- [`unpack_json` pipe](#unpack_json-pipe)
|
||||
- [`unpack_logfmt` pipe](#unpack_logfmt-pipe)
|
||||
- [`extract` pipe](#extract-pipe)
|
||||
|
||||
#### Conditional unpack_syslog
|
||||
|
||||
If the [`unpack_syslog` pipe](#unpack_syslog-pipe) musn't be applied to every [log entry](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
|
||||
then add `if (<filters>)` after `unpack_syslog`.
|
||||
The `<filters>` can contain arbitrary [filters](#filters). For example, the following query unpacks syslog message fields from `foo` field
|
||||
only if `hostname` field in the current log entry isn't set or empty:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_syslog if (hostname:"") from foo
|
||||
```
|
||||
|
||||
### unroll pipe
|
||||
|
||||
`| unroll by (field1, ..., fieldN)` [pipe](#pipes) can be used for unrolling JSON arrays from `field1`, `fieldN`
|
||||
|
|
|
@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i
|
|||
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
|
||||
|
||||
```sh
|
||||
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.15.0-victorialogs/victoria-logs-linux-amd64-v0.15.0-victorialogs.tar.gz
|
||||
tar xzf victoria-logs-linux-amd64-v0.15.0-victorialogs.tar.gz
|
||||
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.16.0-victorialogs/victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz
|
||||
tar xzf victoria-logs-linux-amd64-v0.16.0-victorialogs.tar.gz
|
||||
./victoria-logs-prod
|
||||
```
|
||||
|
||||
|
@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container:
|
|||
|
||||
```sh
|
||||
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
|
||||
docker.io/victoriametrics/victoria-logs:v0.15.0-victorialogs
|
||||
docker.io/victoriametrics/victoria-logs:v0.16.0-victorialogs
|
||||
```
|
||||
|
||||
See also:
|
||||
|
|
|
@ -10,7 +10,7 @@ aliases:
|
|||
VictoriaLogs is [open source](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/app/victoria-logs) user-friendly database for logs
|
||||
from [VictoriaMetrics](https://github.com/VictoriaMetrics/VictoriaMetrics/).
|
||||
|
||||
VictoriaLogs provides the following key features:
|
||||
VictoriaLogs provides the following features:
|
||||
|
||||
- VictoriaLogs can accept logs from popular log collectors. See [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/).
|
||||
- VictoriaLogs is much easier to set up and operate compared to Elasticsearch and Grafana Loki.
|
||||
|
|
399
docs/VictoriaLogs/logsql-examples.md
Normal file
399
docs/VictoriaLogs/logsql-examples.md
Normal file
|
@ -0,0 +1,399 @@
|
|||
---
|
||||
sort: 100
|
||||
weight: 100
|
||||
title: LogsQL examples
|
||||
menu:
|
||||
docs:
|
||||
parent: "victorialogs"
|
||||
weight: 100
|
||||
---
|
||||
|
||||
# LogsQL examples
|
||||
|
||||
## How to select recently ingested logs?
|
||||
|
||||
[Run](https://docs.victoriametrics.com/victorialogs/querying/) the following query:
|
||||
|
||||
```logsql
|
||||
_time:5m
|
||||
```
|
||||
|
||||
It returns logs over the last 5 minutes by using [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter).
|
||||
The logs are returned in arbitrary order because of performance reasons.
|
||||
Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) to the query if you need sorting
|
||||
the returned logs by some field (usually [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field)):
|
||||
|
||||
```logsql
|
||||
_time:5m | sort by (_time)
|
||||
|
||||
If the number of returned logs is too big, it may be limited with the [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe).
|
||||
For example, the following query returns 10 most recent logs, which were ingested during the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | sort by (_time desc) | limit 10
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to count the number of matching logs](#how-to-count-the-number-of-matching-logs)
|
||||
|
||||
## How to select logs with the given word in log message?
|
||||
|
||||
Just put the needed [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the query.
|
||||
For example, the following query returns all the logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
in [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
||||
```logsql
|
||||
error
|
||||
```
|
||||
|
||||
If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
|
||||
for limiting the time range for the selected logs. For example, the following query returns logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
over the last hour:
|
||||
|
||||
```logsql
|
||||
error _time:1h
|
||||
```
|
||||
|
||||
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
|
||||
to the query. For example, the following query selects logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word),
|
||||
which do not contain `kubernetes` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour:
|
||||
|
||||
```logsql
|
||||
error !kubernetes _time:1h
|
||||
```
|
||||
|
||||
The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query
|
||||
sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
```logsql
|
||||
error _time:1h | sort by (_time)
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message)
|
||||
- [How to select logs with some of the given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message)
|
||||
- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message)
|
||||
- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter)
|
||||
- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter)
|
||||
- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter)
|
||||
- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter)
|
||||
|
||||
|
||||
## How to skip logs with the given word in log message?
|
||||
|
||||
Use [`NOT` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter). For example, the following query returns all the logs
|
||||
without the `INFO` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
||||
```logsq
|
||||
!INFO
|
||||
```
|
||||
|
||||
If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
|
||||
for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour:
|
||||
|
||||
```logsql
|
||||
!INFO _time:1h
|
||||
```
|
||||
|
||||
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
|
||||
to the query. For example, the following query selects logs without `INFO` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word),
|
||||
which contain `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour:
|
||||
|
||||
```logsql
|
||||
!INFO error _time:1h
|
||||
```
|
||||
|
||||
The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query
|
||||
sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
```logsql
|
||||
!INFO _time:1h | sort by (_time)
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message)
|
||||
- [How to select logs with some of given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message)
|
||||
- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter)
|
||||
- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter)
|
||||
- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter)
|
||||
- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter)
|
||||
|
||||
|
||||
## How to select logs with all the given words in log message?
|
||||
|
||||
Just enumerate the needed [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the query, by deliming them with whitespace.
|
||||
For example, the following query selects logs containing both `error` and `kubernetes` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
||||
```logsql
|
||||
error kubernetes
|
||||
```
|
||||
|
||||
This query uses [`AND` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter).
|
||||
|
||||
If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
|
||||
for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour:
|
||||
|
||||
```logsql
|
||||
error kubernetes _time:1h
|
||||
```
|
||||
|
||||
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
|
||||
to the query. For example, the following query selects logs with `error` and `kubernetes` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
from [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) containing `container="my-app"` field, over the last hour:
|
||||
|
||||
```logsql
|
||||
error kubernetes _stream:{container="my-app"} _time:1h
|
||||
```
|
||||
|
||||
The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query
|
||||
sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
```logsql
|
||||
error kubernetes _time:1h | sort by (_time)
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to select logs with some of given words in log message?](#how-to-select-logs-with-some-of-the-given-words-in-log-message)
|
||||
- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message)
|
||||
- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter)
|
||||
- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter)
|
||||
- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter)
|
||||
- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter)
|
||||
|
||||
|
||||
## How to select logs with some of the given words in log message?
|
||||
|
||||
Put the needed [words](https://docs.victoriametrics.com/victorialogs/logsql/#word) into `(...)`, by delimiting them with ` or `.
|
||||
For example, the following query selects logs with `error`, `ERROR` or `Error` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
||||
```logsql
|
||||
(error or ERROR or Error)
|
||||
```
|
||||
|
||||
This query uses [`OR` logical filter](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter).
|
||||
|
||||
If the number of returned logs is too big, then add [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
|
||||
for limiting the time range for the selected logs. For example, the following query returns matching logs over the last hour:
|
||||
|
||||
```logsql
|
||||
(error or ERROR or Error) _time:1h
|
||||
```
|
||||
|
||||
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
|
||||
to the query. For example, the following query selects logs without `error`, `ERROR` or `Error` [words](https://docs.victoriametrics.com/victorialogs/logsql/#word),
|
||||
which do not contain `kubernetes` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word), over the last hour:
|
||||
|
||||
```logsql
|
||||
(error or ERROR or Error) !kubernetes _time:1h
|
||||
```
|
||||
|
||||
The logs are returned in arbitrary order because of performance reasons. Add [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
for sorting logs by the needed [fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model). For example, the following query
|
||||
sorts the selected logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
```logsql
|
||||
(error or ERROR or Error) _time:1h | sort by (_time)
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to select logs with all the given words in log message?](#how-to-select-logs-with-all-the-given-words-in-log-message)
|
||||
- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message)
|
||||
- [Filtering by phrase](https://docs.victoriametrics.com/victorialogs/logsql/#phrase-filter)
|
||||
- [Filtering by prefix](https://docs.victoriametrics.com/victorialogs/logsql/#prefix-filter)
|
||||
- [Filtering by regular expression](https://docs.victoriametrics.com/victorialogs/logsql/#regexp-filter)
|
||||
- [Filtering by substring](https://docs.victoriametrics.com/victorialogs/logsql/#substring-filter)
|
||||
|
||||
|
||||
## How to select logs from the given application instance?
|
||||
|
||||
Make sure the application is properly configured with [stream-level log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
|
||||
Then just use [`_stream` filter](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter) for selecting logs for the given application instance.
|
||||
For example, if the application contains `job="app-42"` and `instance="host-123:5678"` [stream fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields),
|
||||
then the following query selects all the logs from this application:
|
||||
|
||||
```logsql
|
||||
_stream:{job="app-42",instance="host-123:5678"}
|
||||
```
|
||||
|
||||
If the number of returned logs is too big, it is recommended adding [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter)
|
||||
to the query in order to reduce the number of matching logs. For example, the following query returns logs for the given application for the last day:
|
||||
|
||||
```logsql
|
||||
_stream:{job="app-42",instance="host-123:5678"} _time:1d
|
||||
```
|
||||
|
||||
If the number of returned logs is still too big, then consider adding more specific [filters](https://docs.victoriametrics.com/victorialogs/logsql/#filters)
|
||||
to the query. For example, the following query selects logs from the given [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields),
|
||||
which contain `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field),
|
||||
over the last day:
|
||||
|
||||
```logsql
|
||||
_stream:{job="app-42",instance="host-123:5678"} error _time:1d
|
||||
```
|
||||
|
||||
The logs are returned in arbitrary order because of performance reasons. Use [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
for sorting the returned logs by the needed fields. For example, the following query sorts the selected logs
|
||||
by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field):
|
||||
|
||||
```logsql
|
||||
_stream:{job="app-42",instance="host-123:5678"} _time:1d | sort by (_time)
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [How to determine applications with the most logs?](#how-to-determine-applications-with-the-most-logs)
|
||||
- [How to skip logs with the given word in log message?](#how-to-skip-logs-with-the-given-word-in-log-message)
|
||||
|
||||
|
||||
## How to count the number of matching logs?
|
||||
|
||||
Use [`count()` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count-stats). For example, the following query returns
|
||||
the number of results returned by `your_query_here`:
|
||||
|
||||
```logsql
|
||||
your_query_here | count()
|
||||
```
|
||||
|
||||
## How to determine applications with the most logs?
|
||||
|
||||
[Run](https://docs.victoriametrics.com/victorialogs/querying/) the following query:
|
||||
|
||||
```logsql
|
||||
_time:5m | stats by (_stream) count() as logs | sort by (logs desc) | limit 10
|
||||
```
|
||||
|
||||
This query returns top 10 application instances (aka [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields))
|
||||
with the most logs over the last 5 minutes.
|
||||
|
||||
This query uses the following [LogsQL](https://docs.victoriametrics.com/victorialogs/logsql/) features:
|
||||
|
||||
- [`_time` filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) for selecting logs on the given time range (5 minutes in the query above).
|
||||
- [`stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#stats-pipe) for calculating the number of logs.
|
||||
per each [`_stream`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). [`count` stats function](https://docs.victoriametrics.com/victorialogs/logsql/#count-stats)
|
||||
is used for calculating the needed stats.
|
||||
- [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) for sorting the stats by `logs` field in descending order.
|
||||
- [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) for limiting the number of returned results to 10.
|
||||
|
||||
See also:
|
||||
|
||||
- [How to filter out data after stats calculation?](#how-to-filter-out-data-after-stats-calculation)
|
||||
- [How to calculate the number of logs per the given interval?](#how-to-calculate-the-number-of-logs-per-the-given-interval)
|
||||
- [How to select logs from the given application instance?](#how-to-select-logs-from-the-given-application-instance)
|
||||
|
||||
|
||||
## How to parse JSON inside log message?
|
||||
|
||||
It is better from performance and resource usage PoV to avoid storing JSON inside [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field).
|
||||
It is recommended storing individual JSON fields and log fields instead according to [VictoriaLogs data model](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
||||
If you have to store JSON inside log message or inside any other [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model),
|
||||
then the stored JSON can be parsed during query time via [`unpack_json` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unpack_json-pipe).
|
||||
For example, the following query unpacks JSON from the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
|
||||
across all the logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | unpack_json
|
||||
```
|
||||
|
||||
If you need to parse JSON array, then take a look at [`unroll` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#unroll-pipe).
|
||||
|
||||
|
||||
## How to extract some data from text log message?
|
||||
|
||||
Use [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe) or [`extract_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) pipe.
|
||||
For example, the following query extracts `username` and `user_id` fields from text [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
|
||||
```logsql
|
||||
_time:5m | extract "username=<username>, user_id=<user_id>,"
|
||||
```
|
||||
|
||||
See also:
|
||||
|
||||
- [Replacing substrings in text fields](https://docs.victoriametrics.com/victorialogs/logsql/#replace-pipe)
|
||||
|
||||
|
||||
## How to filter out data after stats calculation?
|
||||
|
||||
Use [`filter` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#filter-pipe). For example, the following query
|
||||
returns only [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) with more than 1000 logs
|
||||
over the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | stats by (_stream) count() rows | filter rows:>1000
|
||||
```
|
||||
|
||||
## How to calculate the number of logs per the given interval?
|
||||
|
||||
Use [`stats` by time bucket](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-time-buckets). For example, the following query
|
||||
returns per-hour number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) for the last day:
|
||||
|
||||
```logsq
|
||||
_time:1d error | stats by (_time:1h) count() rows | sort by (_time)
|
||||
```
|
||||
|
||||
This query uses [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) in order to sort per-hour stats
|
||||
by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).
|
||||
|
||||
|
||||
## How to calculate the number of logs per every value of the given field?
|
||||
|
||||
Use [`stats` by field](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-fields). For example, the following query
|
||||
calculates the number of logs per `level` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for logs over the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | stats by (level) count() rows
|
||||
```
|
||||
|
||||
An alternative is to use [`field_values` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#field_values-pipe):
|
||||
|
||||
```logsql
|
||||
_time:5m | field_values level
|
||||
```
|
||||
|
||||
## How to get unique values for the given field?
|
||||
|
||||
Use [`uniq` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe). For example, the following query returns unique values for the `ip` field
|
||||
over logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | uniq by (ip)
|
||||
```
|
||||
|
||||
## How to get unique sets of values for the given fields?
|
||||
|
||||
Use [`uniq` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#uniq-pipe). For example, the following query returns unique sets for (`host`, `path`) fields
|
||||
over logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m | uniq by (host, path)
|
||||
```
|
||||
|
||||
## How to return last N logs for the given query?
|
||||
|
||||
Use [`sort` pipe with limit](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe). For example, the following query returns the last 10 logs with the `error`
|
||||
[word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field)
|
||||
over the logs for the last 5 minutes:
|
||||
|
||||
```logsql
|
||||
_time:5m error | sort by (_time desc) limit 10
|
||||
```
|
||||
|
||||
It sorts the matching logs by [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in descending order and then selects
|
||||
the first 10 logs with the highest values for the `_time` field.
|
||||
|
||||
If the query is sent to [`/select/logsql/query` HTTP API](https://docs.victoriametrics.com/victorialogs/querying/#querying-logs), then `limit=N` query arg
|
||||
can be passed to it in order to return up to `N` latest log entries. For example, the following command returns up to 10 latest log entries with the `error` word:
|
||||
|
||||
```sh
|
||||
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
|
||||
```
|
|
@ -58,12 +58,14 @@ By default the `/select/logsql/query` returns all the log entries matching the g
|
|||
|
||||
- By closing the response stream at any time. VictoriaLogs stops query execution and frees all the resources occupied by the request as soon as it detects closed client connection.
|
||||
So it is safe running [`*` query](https://docs.victoriametrics.com/victorialogs/logsql/#any-value-filter), which selects all the logs, even if trillions of logs are stored in VictoriaLogs.
|
||||
- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following request returns
|
||||
up to 10 matching log entries:
|
||||
- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following command returns
|
||||
up to 10 most recently added log entries with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
|
||||
in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
```sh
|
||||
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
|
||||
```
|
||||
- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example:
|
||||
- By adding [`limit` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe) to the query. For example, the following command returns up to 10 **random** log entries
|
||||
with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) in the [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field):
|
||||
```sh
|
||||
curl http://localhost:9428/select/logsql/query -d 'query=error | limit 10'
|
||||
```
|
||||
|
@ -87,8 +89,11 @@ This allows post-processing the returned lines at the client side with the usual
|
|||
without worrying about resource usage at VictoriaLogs side. See [these docs](#command-line) for more details.
|
||||
|
||||
The returned lines aren't sorted by default, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||
Query results can be sorted either at VictoriaLogs side via [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe)
|
||||
or at client side with the usual `sort` command according to [these docs](#command-line).
|
||||
Query results can be sorted in the following ways:
|
||||
|
||||
- By passing `limit=N` query arg to `/select/logsql/query`. The up to `N` most recent matching log entries are returned in the response.
|
||||
- By adding [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) to the query.
|
||||
- By using Unix `sort` command at client side according to [these docs](#command-line).
|
||||
|
||||
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/victorialogs/#multitenancy) is queried.
|
||||
If you need querying other tenant, then specify it via `AccountID` and `ProjectID` http request headers. For example, the following query searches
|
||||
|
|
|
@ -1226,7 +1226,7 @@ func (br *blockResult) getBucketedValue(s string, bf *byStatsField) string {
|
|||
|
||||
buf := br.a.b
|
||||
bufLen := len(buf)
|
||||
buf = marshalDuration(buf, nsecs)
|
||||
buf = marshalDurationString(buf, nsecs)
|
||||
br.a.b = buf
|
||||
return bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
}
|
||||
|
|
|
@ -114,16 +114,14 @@ func (fa *filterAnd) getByFieldTokens() []fieldTokens {
|
|||
|
||||
func (fa *filterAnd) initByFieldTokens() {
|
||||
m := make(map[string]map[string]struct{})
|
||||
byFieldFilters := make(map[string]int)
|
||||
var fieldNames []string
|
||||
|
||||
mergeFieldTokens := func(fieldName string, tokens []string) {
|
||||
fieldName = getCanonicalColumnName(fieldName)
|
||||
byFieldFilters[fieldName]++
|
||||
if len(tokens) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
fieldName = getCanonicalColumnName(fieldName)
|
||||
mTokens, ok := m[fieldName]
|
||||
if !ok {
|
||||
fieldNames = append(fieldNames, fieldName)
|
||||
|
@ -165,11 +163,6 @@ func (fa *filterAnd) initByFieldTokens() {
|
|||
|
||||
var byFieldTokens []fieldTokens
|
||||
for _, fieldName := range fieldNames {
|
||||
if byFieldFilters[fieldName] < 2 {
|
||||
// It is faster to perform bloom filter match inline when visiting the corresponding column
|
||||
continue
|
||||
}
|
||||
|
||||
mTokens := m[fieldName]
|
||||
tokens := make([]string, 0, len(mTokens))
|
||||
for token := range mTokens {
|
||||
|
|
|
@ -5,7 +5,7 @@ type filterNoop struct {
|
|||
}
|
||||
|
||||
func (fn *filterNoop) String() string {
|
||||
return ""
|
||||
return "*"
|
||||
}
|
||||
|
||||
func (fn *filterNoop) updateNeededFields(_ fieldsSet) {
|
||||
|
|
|
@ -127,16 +127,14 @@ func (fo *filterOr) getByFieldTokens() []fieldTokens {
|
|||
|
||||
func (fo *filterOr) initByFieldTokens() {
|
||||
m := make(map[string][][]string)
|
||||
byFieldFilters := make(map[string]int)
|
||||
var fieldNames []string
|
||||
|
||||
mergeFieldTokens := func(fieldName string, tokens []string) {
|
||||
fieldName = getCanonicalColumnName(fieldName)
|
||||
byFieldFilters[fieldName]++
|
||||
if len(tokens) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
fieldName = getCanonicalColumnName(fieldName)
|
||||
if _, ok := m[fieldName]; !ok {
|
||||
fieldNames = append(fieldNames, fieldName)
|
||||
}
|
||||
|
@ -173,11 +171,6 @@ func (fo *filterOr) initByFieldTokens() {
|
|||
|
||||
var byFieldTokens []fieldTokens
|
||||
for _, fieldName := range fieldNames {
|
||||
if byFieldFilters[fieldName] < 2 {
|
||||
// It is faster to perform bloom filter match inline when visiting the corresponding column
|
||||
continue
|
||||
}
|
||||
|
||||
commonTokens := getCommonTokens(m[fieldName])
|
||||
if len(commonTokens) > 0 {
|
||||
byFieldTokens = append(byFieldTokens, fieldTokens{
|
||||
|
|
|
@ -15,6 +15,10 @@ func (p *logfmtParser) reset() {
|
|||
}
|
||||
|
||||
func (p *logfmtParser) addField(name, value string) {
|
||||
name = strings.TrimSpace(name)
|
||||
if name == "" && value == "" {
|
||||
return
|
||||
}
|
||||
p.fields = append(p.fields, Field{
|
||||
Name: name,
|
||||
Value: value,
|
||||
|
@ -22,16 +26,24 @@ func (p *logfmtParser) addField(name, value string) {
|
|||
}
|
||||
|
||||
func (p *logfmtParser) parse(s string) {
|
||||
p.reset()
|
||||
for {
|
||||
// Search for field name
|
||||
n := strings.IndexByte(s, '=')
|
||||
n := strings.IndexAny(s, "= ")
|
||||
if n < 0 {
|
||||
// field name couldn't be read
|
||||
// empty value
|
||||
p.addField(s, "")
|
||||
return
|
||||
}
|
||||
|
||||
name := strings.TrimSpace(s[:n])
|
||||
name := s[:n]
|
||||
ch := s[n]
|
||||
s = s[n+1:]
|
||||
if ch == ' ' {
|
||||
// empty value
|
||||
p.addField(name, "")
|
||||
continue
|
||||
}
|
||||
if len(s) == 0 {
|
||||
p.addField(name, "")
|
||||
return
|
||||
|
|
|
@ -12,7 +12,7 @@ func TestLogfmtParser(t *testing.T) {
|
|||
defer putLogfmtParser(p)
|
||||
|
||||
p.parse(s)
|
||||
result := marshalFieldsToJSON(nil, p.fields)
|
||||
result := MarshalFieldsToJSON(nil, p.fields)
|
||||
if string(result) != resultExpected {
|
||||
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
|
||||
}
|
||||
|
@ -22,9 +22,9 @@ func TestLogfmtParser(t *testing.T) {
|
|||
f(`foo=bar`, `{"foo":"bar"}`)
|
||||
f(`foo="bar=baz x=y"`, `{"foo":"bar=baz x=y"}`)
|
||||
f(`foo=`, `{"foo":""}`)
|
||||
f(`foo`, `{"foo":""}`)
|
||||
f(`foo bar`, `{"foo":"","bar":""}`)
|
||||
f(`foo bar=baz`, `{"foo":"","bar":"baz"}`)
|
||||
f(`foo=bar baz="x y" a=b`, `{"foo":"bar","baz":"x y","a":"b"}`)
|
||||
|
||||
// errors
|
||||
f(`foo`, `{}`)
|
||||
f(`foo=bar baz=x z qwe`, `{"foo":"bar","baz":"x"}`)
|
||||
f(` foo=bar baz=x =z qwe`, `{"foo":"bar","baz":"x","":"z","qwe":""}`)
|
||||
}
|
||||
|
|
|
@ -243,8 +243,8 @@ func (q *Query) String() string {
|
|||
func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
|
||||
{
|
||||
// add 'stats by (_time:step offset off, fields) count() hits'
|
||||
stepStr := string(marshalDuration(nil, step))
|
||||
offsetStr := string(marshalDuration(nil, off))
|
||||
stepStr := string(marshalDurationString(nil, step))
|
||||
offsetStr := string(marshalDurationString(nil, off))
|
||||
byFieldsStr := "_time:" + stepStr + " offset " + offsetStr
|
||||
for _, f := range fields {
|
||||
byFieldsStr += ", " + quoteTokenIfNeeded(f)
|
||||
|
@ -279,6 +279,38 @@ func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
|
|||
}
|
||||
}
|
||||
|
||||
// Clone returns a copy of q.
|
||||
func (q *Query) Clone() *Query {
|
||||
qStr := q.String()
|
||||
qCopy, err := ParseQuery(qStr)
|
||||
if err != nil {
|
||||
logger.Panicf("BUG: cannot parse %q: %s", qStr, err)
|
||||
}
|
||||
return qCopy
|
||||
}
|
||||
|
||||
// CanReturnLastNResults returns true if time range filter at q can be adjusted for returning the last N results.
|
||||
func (q *Query) CanReturnLastNResults() bool {
|
||||
for _, p := range q.pipes {
|
||||
switch p.(type) {
|
||||
case *pipeFieldNames,
|
||||
*pipeFieldValues,
|
||||
*pipeLimit,
|
||||
*pipeOffset,
|
||||
*pipeSort,
|
||||
*pipeStats,
|
||||
*pipeUniq:
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetFilterTimeRange returns filter time range for the given q.
|
||||
func (q *Query) GetFilterTimeRange() (int64, int64) {
|
||||
return getFilterTimeRange(q.f)
|
||||
}
|
||||
|
||||
// AddTimeFilter adds global filter _time:[start ... end] to q.
|
||||
func (q *Query) AddTimeFilter(start, end int64) {
|
||||
startStr := marshalTimestampRFC3339NanoString(nil, start)
|
||||
|
@ -1394,12 +1426,12 @@ func parseFilterTime(lex *lexer) (*filterTime, error) {
|
|||
sLower := strings.ToLower(s)
|
||||
if sLower == "now" || startsWithYear(s) {
|
||||
// Parse '_time:YYYY-MM-DD', which transforms to '_time:[YYYY-MM-DD, YYYY-MM-DD+1)'
|
||||
t, err := promutils.ParseTimeAt(s, float64(lex.currentTimestamp)/1e9)
|
||||
nsecs, err := promutils.ParseTimeAt(s, lex.currentTimestamp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse _time filter: %w", err)
|
||||
}
|
||||
// Round to milliseconds
|
||||
startTime := int64(math.Round(t*1e3)) * 1e6
|
||||
startTime := nsecs
|
||||
endTime := getMatchingEndTime(startTime, s)
|
||||
ft := &filterTime{
|
||||
minTimestamp: startTime,
|
||||
|
@ -1549,12 +1581,11 @@ func parseTime(lex *lexer) (int64, string, error) {
|
|||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
t, err := promutils.ParseTimeAt(s, float64(lex.currentTimestamp)/1e9)
|
||||
nsecs, err := promutils.ParseTimeAt(s, lex.currentTimestamp)
|
||||
if err != nil {
|
||||
return 0, "", err
|
||||
}
|
||||
// round to milliseconds
|
||||
return int64(math.Round(t*1e3)) * 1e6, s, nil
|
||||
return nsecs, s, nil
|
||||
}
|
||||
|
||||
func quoteStringTokenIfNeeded(s string) string {
|
||||
|
|
|
@ -1031,7 +1031,7 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
sum(duration) if (host:in('foo.com', 'bar.com') and path:/foobar) as bar`,
|
||||
`* | stats by (_time:1d offset -2h, f2) count(*) if (is_admin:true or "foo bar"*) as foo, sum(duration) if (host:in(foo.com,bar.com) path:"/foobar") as bar`)
|
||||
f(`* | stats count(x) if (error ip:in(_time:1d | fields ip)) rows`, `* | stats count(x) if (error ip:in(_time:1d | fields ip)) as rows`)
|
||||
f(`* | stats count() if () rows`, `* | stats count(*) if () as rows`)
|
||||
f(`* | stats count() if () rows`, `* | stats count(*) if (*) as rows`)
|
||||
|
||||
// sort pipe
|
||||
f(`* | sort`, `* | sort`)
|
||||
|
@ -1832,3 +1832,72 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
|||
f(`* | unroll (a, b) | count() r1`, `a,b`, ``)
|
||||
f(`* | unroll if (q:w p:a) (a, b) | count() r1`, `a,b,p,q`, ``)
|
||||
}
|
||||
|
||||
func TestQueryClone(t *testing.T) {
|
||||
f := func(qStr string) {
|
||||
t.Helper()
|
||||
|
||||
q, err := ParseQuery(qStr)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
qCopy := q.Clone()
|
||||
qCopyStr := qCopy.String()
|
||||
if qStr != qCopyStr {
|
||||
t.Fatalf("unexpected cloned query\ngot\n%s\nwant\n%s", qCopyStr, qStr)
|
||||
}
|
||||
}
|
||||
|
||||
f("*")
|
||||
f("error")
|
||||
f("_time:5m error | fields foo, bar")
|
||||
f("ip:in(foo | fields user_ip) bar | stats by (x:1h, y) count(*) if (user_id:in(q:w | fields abc)) as ccc")
|
||||
}
|
||||
|
||||
func TestQueryGetFilterTimeRange(t *testing.T) {
|
||||
f := func(qStr string, startExpected, endExpected int64) {
|
||||
t.Helper()
|
||||
|
||||
q, err := ParseQuery(qStr)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
start, end := q.GetFilterTimeRange()
|
||||
if start != startExpected || end != endExpected {
|
||||
t.Fatalf("unexpected filter time range; got [%d, %d]; want [%d, %d]", start, end, startExpected, endExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("*", -9223372036854775808, 9223372036854775807)
|
||||
f("_time:2024-05-31T10:20:30.456789123Z", 1717150830456789123, 1717150830456789123)
|
||||
f("_time:2024-05-31", 1717113600000000000, 1717199999999999999)
|
||||
}
|
||||
|
||||
func TestQueryCanReturnLastNResults(t *testing.T) {
|
||||
f := func(qStr string, resultExpected bool) {
|
||||
t.Helper()
|
||||
|
||||
q, err := ParseQuery(qStr)
|
||||
if err != nil {
|
||||
t.Fatalf("cannot parse [%s]: %s", qStr, err)
|
||||
}
|
||||
result := q.CanReturnLastNResults()
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for CanRetrurnLastNResults(%q); got %v; want %v", qStr, result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("*", true)
|
||||
f("error", true)
|
||||
f("error | fields foo | filter foo:bar", true)
|
||||
f("error | extract '<foo>bar<baz>'", true)
|
||||
f("* | rm x", true)
|
||||
f("* | stats count() rows", false)
|
||||
f("* | sort by (x)", false)
|
||||
f("* | limit 10", false)
|
||||
f("* | offset 10", false)
|
||||
f("* | uniq (x)", false)
|
||||
f("* | field_names", false)
|
||||
f("* | field_values x", false)
|
||||
|
||||
}
|
||||
|
|
|
@ -106,6 +106,12 @@ func parsePipe(lex *lexer) (pipe, error) {
|
|||
return nil, fmt.Errorf("cannot parse 'delete' pipe: %w", err)
|
||||
}
|
||||
return pd, nil
|
||||
case lex.isKeyword("drop_empty_fields"):
|
||||
pd, err := parsePipeDropEmptyFields(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'drop_empty_fields' pipe: %w", err)
|
||||
}
|
||||
return pd, nil
|
||||
case lex.isKeyword("extract"):
|
||||
pe, err := parsePipeExtract(lex)
|
||||
if err != nil {
|
||||
|
@ -220,6 +226,12 @@ func parsePipe(lex *lexer) (pipe, error) {
|
|||
return nil, fmt.Errorf("cannot parse 'unpack_logfmt' pipe: %w", err)
|
||||
}
|
||||
return pu, nil
|
||||
case lex.isKeyword("unpack_syslog"):
|
||||
pu, err := parsePipeUnpackSyslog(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'unpack_syslog' pipe: %w", err)
|
||||
}
|
||||
return pu, nil
|
||||
case lex.isKeyword("unroll"):
|
||||
pu, err := parsePipeUnroll(lex)
|
||||
if err != nil {
|
||||
|
|
223
lib/logstorage/pipe_drop_empty_fields.go
Normal file
223
lib/logstorage/pipe_drop_empty_fields.go
Normal file
|
@ -0,0 +1,223 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
// pipeDropEmptyFields processes '| drop_empty_fields ...' pipe.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe
|
||||
type pipeDropEmptyFields struct {
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) String() string {
|
||||
return "drop_empty_fields"
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) optimize() {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||||
return pd, nil
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) updateNeededFields(_, _ fieldsSet) {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
func (pd *pipeDropEmptyFields) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||
return &pipeDropEmptyFieldsProcessor{
|
||||
ppNext: ppNext,
|
||||
|
||||
shards: make([]pipeDropEmptyFieldsProcessorShard, workersCount),
|
||||
}
|
||||
}
|
||||
|
||||
type pipeDropEmptyFieldsProcessor struct {
|
||||
ppNext pipeProcessor
|
||||
|
||||
shards []pipeDropEmptyFieldsProcessorShard
|
||||
}
|
||||
|
||||
type pipeDropEmptyFieldsProcessorShard struct {
|
||||
pipeDropEmptyFieldsProcessorShardNopad
|
||||
|
||||
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||
_ [128 - unsafe.Sizeof(pipeDropEmptyFieldsProcessorShardNopad{})%128]byte
|
||||
}
|
||||
|
||||
type pipeDropEmptyFieldsProcessorShardNopad struct {
|
||||
columnValues [][]string
|
||||
fields []Field
|
||||
|
||||
wctx pipeDropEmptyFieldsWriteContext
|
||||
}
|
||||
|
||||
func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||
if len(br.timestamps) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
shard := &pdp.shards[workerID]
|
||||
|
||||
cs := br.getColumns()
|
||||
|
||||
shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs))
|
||||
columnValues := shard.columnValues
|
||||
for i, c := range cs {
|
||||
columnValues[i] = c.getValues(br)
|
||||
}
|
||||
|
||||
if !hasEmptyValues(columnValues) {
|
||||
// Fast path - just write br to ppNext, since it has no empty values.
|
||||
pdp.ppNext.writeBlock(workerID, br)
|
||||
return
|
||||
}
|
||||
|
||||
// Slow path - drop fields with empty values
|
||||
shard.wctx.init(workerID, pdp.ppNext)
|
||||
|
||||
fields := shard.fields
|
||||
for rowIdx := range br.timestamps {
|
||||
fields = fields[:0]
|
||||
for i, values := range columnValues {
|
||||
v := values[rowIdx]
|
||||
if v == "" {
|
||||
continue
|
||||
}
|
||||
fields = append(fields, Field{
|
||||
Name: cs[i].name,
|
||||
Value: values[rowIdx],
|
||||
})
|
||||
}
|
||||
shard.wctx.writeRow(fields)
|
||||
}
|
||||
shard.fields = fields
|
||||
|
||||
shard.wctx.flush()
|
||||
}
|
||||
|
||||
func (pdp *pipeDropEmptyFieldsProcessor) flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type pipeDropEmptyFieldsWriteContext struct {
|
||||
workerID uint
|
||||
ppNext pipeProcessor
|
||||
|
||||
rcs []resultColumn
|
||||
br blockResult
|
||||
|
||||
// rowsCount is the number of rows in the current block
|
||||
rowsCount int
|
||||
|
||||
// valuesLen is the total length of values in the current block
|
||||
valuesLen int
|
||||
}
|
||||
|
||||
func (wctx *pipeDropEmptyFieldsWriteContext) reset() {
|
||||
wctx.workerID = 0
|
||||
wctx.ppNext = nil
|
||||
|
||||
rcs := wctx.rcs
|
||||
for i := range rcs {
|
||||
rcs[i].reset()
|
||||
}
|
||||
wctx.rcs = rcs[:0]
|
||||
|
||||
wctx.rowsCount = 0
|
||||
wctx.valuesLen = 0
|
||||
}
|
||||
|
||||
func (wctx *pipeDropEmptyFieldsWriteContext) init(workerID uint, ppNext pipeProcessor) {
|
||||
wctx.reset()
|
||||
|
||||
wctx.workerID = workerID
|
||||
wctx.ppNext = ppNext
|
||||
}
|
||||
|
||||
func (wctx *pipeDropEmptyFieldsWriteContext) writeRow(fields []Field) {
|
||||
if len(fields) == 0 {
|
||||
// skip rows without non-empty fields
|
||||
return
|
||||
}
|
||||
|
||||
rcs := wctx.rcs
|
||||
|
||||
areEqualColumns := len(rcs) == len(fields)
|
||||
if areEqualColumns {
|
||||
for i, f := range fields {
|
||||
if rcs[i].name != f.Name {
|
||||
areEqualColumns = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if !areEqualColumns {
|
||||
// send the current block to ppNext and construct a block with new set of columns
|
||||
wctx.flush()
|
||||
|
||||
rcs = wctx.rcs[:0]
|
||||
for _, f := range fields {
|
||||
rcs = appendResultColumnWithName(rcs, f.Name)
|
||||
}
|
||||
wctx.rcs = rcs
|
||||
}
|
||||
|
||||
for i, f := range fields {
|
||||
v := f.Value
|
||||
rcs[i].addValue(v)
|
||||
wctx.valuesLen += len(v)
|
||||
}
|
||||
|
||||
wctx.rowsCount++
|
||||
if wctx.valuesLen >= 1_000_000 {
|
||||
wctx.flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (wctx *pipeDropEmptyFieldsWriteContext) flush() {
|
||||
rcs := wctx.rcs
|
||||
|
||||
wctx.valuesLen = 0
|
||||
|
||||
// Flush rcs to ppNext
|
||||
br := &wctx.br
|
||||
br.setResultColumns(rcs, wctx.rowsCount)
|
||||
wctx.rowsCount = 0
|
||||
wctx.ppNext.writeBlock(wctx.workerID, br)
|
||||
br.reset()
|
||||
for i := range rcs {
|
||||
rcs[i].resetValues()
|
||||
}
|
||||
}
|
||||
|
||||
func parsePipeDropEmptyFields(lex *lexer) (*pipeDropEmptyFields, error) {
|
||||
if !lex.isKeyword("drop_empty_fields") {
|
||||
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "drop_empty_fields")
|
||||
}
|
||||
lex.nextToken()
|
||||
|
||||
pd := &pipeDropEmptyFields{}
|
||||
|
||||
return pd, nil
|
||||
}
|
||||
|
||||
func hasEmptyValues(columnValues [][]string) bool {
|
||||
for _, values := range columnValues {
|
||||
for _, v := range values {
|
||||
if v == "" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
94
lib/logstorage/pipe_drop_empty_fields_test.go
Normal file
94
lib/logstorage/pipe_drop_empty_fields_test.go
Normal file
|
@ -0,0 +1,94 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParsePipeDropEmptyFieldsSuccess(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeSuccess(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`drop_empty_fields`)
|
||||
}
|
||||
|
||||
func TestParsePipeDropEmptyFieldsFailure(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeFailure(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`drop_empty_fields foo`)
|
||||
}
|
||||
|
||||
func TestPipeDropEmptyFields(t *testing.T) {
|
||||
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||
t.Helper()
|
||||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
f(`drop_empty_fields`, [][]Field{
|
||||
{
|
||||
{"a", "foo"},
|
||||
{"b", "bar"},
|
||||
{"c", "baz"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "foo"},
|
||||
{"b", "bar"},
|
||||
{"c", "baz"},
|
||||
},
|
||||
})
|
||||
f(`drop_empty_fields`, [][]Field{
|
||||
{
|
||||
{"a", "foo"},
|
||||
{"b", "bar"},
|
||||
{"c", "baz"},
|
||||
},
|
||||
{
|
||||
{"a", "foo1"},
|
||||
{"b", ""},
|
||||
{"c", "baz1"},
|
||||
},
|
||||
{
|
||||
{"a", ""},
|
||||
{"b", "bar2"},
|
||||
},
|
||||
{
|
||||
{"a", ""},
|
||||
{"b", ""},
|
||||
{"c", ""},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"a", "foo"},
|
||||
{"b", "bar"},
|
||||
{"c", "baz"},
|
||||
},
|
||||
{
|
||||
{"a", "foo1"},
|
||||
{"c", "baz1"},
|
||||
},
|
||||
{
|
||||
{"b", "bar2"},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPipeDropEmptyFieldsUpdateNeededFields(t *testing.T) {
|
||||
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||
t.Helper()
|
||||
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
|
||||
}
|
||||
|
||||
// all the needed fields
|
||||
f(`drop_empty_fields`, "*", "", "*", "")
|
||||
|
||||
// non-empty unneeded fields
|
||||
f(`drop_empty_fields`, "*", "f1,f2", "*", "f1,f2")
|
||||
|
||||
// non-empty needed fields
|
||||
f(`drop_empty_fields`, "x,y", "", "x,y", "")
|
||||
}
|
|
@ -126,7 +126,7 @@ func (ppp *pipePackJSONProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
}
|
||||
|
||||
bufLen := len(buf)
|
||||
buf = marshalFieldsToJSON(buf, fields)
|
||||
buf = MarshalFieldsToJSON(buf, fields)
|
||||
v := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
shard.rc.addValue(v)
|
||||
}
|
||||
|
|
|
@ -57,7 +57,6 @@ func updateNeededFieldsForUnpackPipe(fromField string, outFields []string, keepO
|
|||
}
|
||||
|
||||
type fieldsUnpackerContext struct {
|
||||
workerID uint
|
||||
fieldPrefix string
|
||||
|
||||
fields []Field
|
||||
|
@ -65,7 +64,6 @@ type fieldsUnpackerContext struct {
|
|||
}
|
||||
|
||||
func (uctx *fieldsUnpackerContext) reset() {
|
||||
uctx.workerID = 0
|
||||
uctx.fieldPrefix = ""
|
||||
uctx.resetFields()
|
||||
uctx.a.reset()
|
||||
|
@ -76,10 +74,9 @@ func (uctx *fieldsUnpackerContext) resetFields() {
|
|||
uctx.fields = uctx.fields[:0]
|
||||
}
|
||||
|
||||
func (uctx *fieldsUnpackerContext) init(workerID uint, fieldPrefix string) {
|
||||
func (uctx *fieldsUnpackerContext) init(fieldPrefix string) {
|
||||
uctx.reset()
|
||||
|
||||
uctx.workerID = workerID
|
||||
uctx.fieldPrefix = fieldPrefix
|
||||
}
|
||||
|
||||
|
@ -157,7 +154,7 @@ func (pup *pipeUnpackProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
|
||||
shard := &pup.shards[workerID]
|
||||
shard.wctx.init(workerID, pup.ppNext, pup.keepOriginalFields, pup.skipEmptyResults, br)
|
||||
shard.uctx.init(workerID, pup.fieldPrefix)
|
||||
shard.uctx.init(pup.fieldPrefix)
|
||||
|
||||
bm := &shard.bm
|
||||
bm.init(len(br.timestamps))
|
||||
|
@ -234,6 +231,7 @@ func (wctx *pipeUnpackWriteContext) reset() {
|
|||
wctx.workerID = 0
|
||||
wctx.ppNext = nil
|
||||
wctx.keepOriginalFields = false
|
||||
wctx.skipEmptyResults = false
|
||||
|
||||
wctx.brSrc = nil
|
||||
wctx.csSrc = nil
|
||||
|
|
|
@ -191,17 +191,6 @@ func TestPipeUnpackLogfmt(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
// single row, unpack from non-json field
|
||||
f("unpack_logfmt from x", [][]Field{
|
||||
{
|
||||
{"x", `foobar`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `foobar`},
|
||||
},
|
||||
})
|
||||
|
||||
// single row, unpack from non-logfmt
|
||||
f("unpack_logfmt from x", [][]Field{
|
||||
{
|
||||
|
@ -210,6 +199,7 @@ func TestPipeUnpackLogfmt(t *testing.T) {
|
|||
}, [][]Field{
|
||||
{
|
||||
{"x", `foobar`},
|
||||
{"foobar", ""},
|
||||
},
|
||||
})
|
||||
|
||||
|
|
146
lib/logstorage/pipe_unpack_syslog.go
Normal file
146
lib/logstorage/pipe_unpack_syslog.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// pipeUnpackSyslog processes '| unpack_syslog ...' pipe.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#unpack_syslog-pipe
|
||||
type pipeUnpackSyslog struct {
|
||||
// fromField is the field to unpack syslog fields from
|
||||
fromField string
|
||||
|
||||
// resultPrefix is prefix to add to unpacked field names
|
||||
resultPrefix string
|
||||
|
||||
keepOriginalFields bool
|
||||
|
||||
// iff is an optional filter for skipping unpacking syslog
|
||||
iff *ifFilter
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) String() string {
|
||||
s := "unpack_syslog"
|
||||
if pu.iff != nil {
|
||||
s += " " + pu.iff.String()
|
||||
}
|
||||
if !isMsgFieldName(pu.fromField) {
|
||||
s += " from " + quoteTokenIfNeeded(pu.fromField)
|
||||
}
|
||||
if pu.resultPrefix != "" {
|
||||
s += " result_prefix " + quoteTokenIfNeeded(pu.resultPrefix)
|
||||
}
|
||||
if pu.keepOriginalFields {
|
||||
s += " keep_original_fields"
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||
updateNeededFieldsForUnpackPipe(pu.fromField, nil, pu.keepOriginalFields, false, pu.iff, neededFields, unneededFields)
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) optimize() {
|
||||
pu.iff.optimizeFilterIn()
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) hasFilterInWithQuery() bool {
|
||||
return pu.iff.hasFilterInWithQuery()
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
||||
iffNew, err := pu.iff.initFilterInValues(cache, getFieldValuesFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
puNew := *pu
|
||||
puNew.iff = iffNew
|
||||
return &puNew, nil
|
||||
}
|
||||
|
||||
func (pu *pipeUnpackSyslog) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||
unpackSyslog := func(uctx *fieldsUnpackerContext, s string) {
|
||||
year := currentYear.Load()
|
||||
p := getSyslogParser(int(year))
|
||||
|
||||
p.parse(s)
|
||||
for _, f := range p.fields {
|
||||
uctx.addField(f.Name, f.Value)
|
||||
}
|
||||
|
||||
putSyslogParser(p)
|
||||
}
|
||||
|
||||
return newPipeUnpackProcessor(workersCount, unpackSyslog, ppNext, pu.fromField, pu.resultPrefix, pu.keepOriginalFields, false, pu.iff)
|
||||
}
|
||||
|
||||
var currentYear atomic.Int64
|
||||
|
||||
func init() {
|
||||
year := time.Now().UTC().Year()
|
||||
currentYear.Store(int64(year))
|
||||
go func() {
|
||||
for {
|
||||
t := time.Now().UTC()
|
||||
nextYear := time.Date(t.Year()+1, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
d := nextYear.Sub(t)
|
||||
time.Sleep(d)
|
||||
year := time.Now().UTC().Year()
|
||||
currentYear.Store(int64(year))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func parsePipeUnpackSyslog(lex *lexer) (*pipeUnpackSyslog, error) {
|
||||
if !lex.isKeyword("unpack_syslog") {
|
||||
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "unpack_syslog")
|
||||
}
|
||||
lex.nextToken()
|
||||
|
||||
var iff *ifFilter
|
||||
if lex.isKeyword("if") {
|
||||
f, err := parseIfFilter(lex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
iff = f
|
||||
}
|
||||
|
||||
fromField := "_msg"
|
||||
if lex.isKeyword("from") {
|
||||
lex.nextToken()
|
||||
f, err := parseFieldName(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'from' field name: %w", err)
|
||||
}
|
||||
fromField = f
|
||||
}
|
||||
|
||||
resultPrefix := ""
|
||||
if lex.isKeyword("result_prefix") {
|
||||
lex.nextToken()
|
||||
p, err := getCompoundToken(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'result_prefix': %w", err)
|
||||
}
|
||||
resultPrefix = p
|
||||
}
|
||||
|
||||
keepOriginalFields := false
|
||||
if lex.isKeyword("keep_original_fields") {
|
||||
lex.nextToken()
|
||||
keepOriginalFields = true
|
||||
}
|
||||
|
||||
pu := &pipeUnpackSyslog{
|
||||
fromField: fromField,
|
||||
resultPrefix: resultPrefix,
|
||||
keepOriginalFields: keepOriginalFields,
|
||||
iff: iff,
|
||||
}
|
||||
|
||||
return pu, nil
|
||||
}
|
239
lib/logstorage/pipe_unpack_syslog_test.go
Normal file
239
lib/logstorage/pipe_unpack_syslog_test.go
Normal file
|
@ -0,0 +1,239 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParsePipeUnpackSyslogSuccess(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeSuccess(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`unpack_syslog`)
|
||||
f(`unpack_syslog keep_original_fields`)
|
||||
f(`unpack_syslog if (a:x)`)
|
||||
f(`unpack_syslog if (a:x) keep_original_fields`)
|
||||
f(`unpack_syslog from x`)
|
||||
f(`unpack_syslog from x keep_original_fields`)
|
||||
f(`unpack_syslog if (a:x) from x`)
|
||||
f(`unpack_syslog from x result_prefix abc`)
|
||||
f(`unpack_syslog if (a:x) from x result_prefix abc`)
|
||||
f(`unpack_syslog result_prefix abc`)
|
||||
f(`unpack_syslog if (a:x) result_prefix abc`)
|
||||
}
|
||||
|
||||
func TestParsePipeUnpackSyslogFailure(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeFailure(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`unpack_syslog foo`)
|
||||
f(`unpack_syslog if`)
|
||||
f(`unpack_syslog if (x:y) foobar`)
|
||||
f(`unpack_syslog from`)
|
||||
f(`unpack_syslog from x y`)
|
||||
f(`unpack_syslog from x if`)
|
||||
f(`unpack_syslog from x result_prefix`)
|
||||
f(`unpack_syslog from x result_prefix a b`)
|
||||
f(`unpack_syslog from x result_prefix a if`)
|
||||
f(`unpack_syslog result_prefix`)
|
||||
f(`unpack_syslog result_prefix a b`)
|
||||
f(`unpack_syslog result_prefix a if`)
|
||||
}
|
||||
|
||||
func TestPipeUnpackSyslog(t *testing.T) {
|
||||
f := func(pipeStr string, rows, rowsExpected [][]Field) {
|
||||
t.Helper()
|
||||
expectPipeResults(t, pipeStr, rows, rowsExpected)
|
||||
}
|
||||
|
||||
// no skip empty results
|
||||
f("unpack_syslog", [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"foo", "321"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"foo", "321"},
|
||||
{"priority", "165"},
|
||||
{"facility", "20"},
|
||||
{"severity", "5"},
|
||||
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||
{"hostname", "mymachine.example.com"},
|
||||
{"app_name", "appname"},
|
||||
{"proc_id", "12345"},
|
||||
{"msg_id", "ID47"},
|
||||
{"message", "This is a test message with structured data"},
|
||||
},
|
||||
})
|
||||
|
||||
// keep original fields
|
||||
f("unpack_syslog keep_original_fields", [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"foo", "321"},
|
||||
{"app_name", "foobar"},
|
||||
{"msg_id", "baz"},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"foo", "321"},
|
||||
{"priority", "165"},
|
||||
{"facility", "20"},
|
||||
{"severity", "5"},
|
||||
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||
{"hostname", "mymachine.example.com"},
|
||||
{"app_name", "foobar"},
|
||||
{"proc_id", "12345"},
|
||||
{"msg_id", "baz"},
|
||||
{"message", "This is a test message with structured data"},
|
||||
},
|
||||
})
|
||||
|
||||
// unpack from other field
|
||||
f("unpack_syslog from x", [][]Field{
|
||||
{
|
||||
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"priority", "165"},
|
||||
{"facility", "20"},
|
||||
{"severity", "5"},
|
||||
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||
{"hostname", "mymachine.example.com"},
|
||||
{"app_name", "appname"},
|
||||
{"proc_id", "12345"},
|
||||
{"msg_id", "ID47"},
|
||||
{"message", "This is a test message with structured data"},
|
||||
},
|
||||
})
|
||||
|
||||
// failed if condition
|
||||
f("unpack_syslog if (foo:bar)", [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
},
|
||||
})
|
||||
|
||||
// matched if condition
|
||||
f("unpack_syslog if (appname)", [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"priority", "165"},
|
||||
{"facility", "20"},
|
||||
{"severity", "5"},
|
||||
{"timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||
{"hostname", "mymachine.example.com"},
|
||||
{"app_name", "appname"},
|
||||
{"proc_id", "12345"},
|
||||
{"msg_id", "ID47"},
|
||||
{"message", "This is a test message with structured data"},
|
||||
},
|
||||
})
|
||||
|
||||
// single row, unpack from missing field
|
||||
f("unpack_syslog from x", [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"_msg", `foo=bar`},
|
||||
},
|
||||
})
|
||||
|
||||
// single row, unpack from non-syslog field
|
||||
f("unpack_syslog from x", [][]Field{
|
||||
{
|
||||
{"x", `foobar`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `foobar`},
|
||||
},
|
||||
})
|
||||
|
||||
// multiple rows with distinct number of fields
|
||||
f("unpack_syslog from x result_prefix qwe_", [][]Field{
|
||||
{
|
||||
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
},
|
||||
{
|
||||
{"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`},
|
||||
{"y", `z=bar`},
|
||||
},
|
||||
}, [][]Field{
|
||||
{
|
||||
{"x", `<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data`},
|
||||
{"qwe_priority", "165"},
|
||||
{"qwe_facility", "20"},
|
||||
{"qwe_severity", "5"},
|
||||
{"qwe_timestamp", "2023-06-03T17:42:32.123456789Z"},
|
||||
{"qwe_hostname", "mymachine.example.com"},
|
||||
{"qwe_app_name", "appname"},
|
||||
{"qwe_proc_id", "12345"},
|
||||
{"qwe_msg_id", "ID47"},
|
||||
{"qwe_message", "This is a test message with structured data"},
|
||||
},
|
||||
{
|
||||
{"x", `<163>1 2024-12-13T18:21:43Z mymachine.example.com appname2 345 ID7 - foobar`},
|
||||
{"y", `z=bar`},
|
||||
{"qwe_priority", "163"},
|
||||
{"qwe_facility", "20"},
|
||||
{"qwe_severity", "3"},
|
||||
{"qwe_timestamp", "2024-12-13T18:21:43Z"},
|
||||
{"qwe_hostname", "mymachine.example.com"},
|
||||
{"qwe_app_name", "appname2"},
|
||||
{"qwe_proc_id", "345"},
|
||||
{"qwe_msg_id", "ID7"},
|
||||
{"qwe_message", "foobar"},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func TestPipeUnpackSyslogUpdateNeededFields(t *testing.T) {
|
||||
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||
t.Helper()
|
||||
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
|
||||
}
|
||||
|
||||
// all the needed fields
|
||||
f("unpack_syslog", "*", "", "*", "")
|
||||
f("unpack_syslog keep_original_fields", "*", "", "*", "")
|
||||
f("unpack_syslog if (y:z) from x", "*", "", "*", "")
|
||||
|
||||
// all the needed fields, unneeded fields do not intersect with src
|
||||
f("unpack_syslog from x", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_syslog if (y:z) from x", "*", "f1,f2", "*", "f1,f2")
|
||||
f("unpack_syslog if (f1:z) from x", "*", "f1,f2", "*", "f2")
|
||||
|
||||
// all the needed fields, unneeded fields intersect with src
|
||||
f("unpack_syslog from x", "*", "f2,x", "*", "f2")
|
||||
f("unpack_syslog if (y:z) from x", "*", "f2,x", "*", "f2")
|
||||
f("unpack_syslog if (f2:z) from x", "*", "f1,f2,x", "*", "f1")
|
||||
|
||||
// needed fields do not intersect with src
|
||||
f("unpack_syslog from x", "f1,f2", "", "f1,f2,x", "")
|
||||
f("unpack_syslog if (y:z) from x", "f1,f2", "", "f1,f2,x,y", "")
|
||||
f("unpack_syslog if (f1:z) from x", "f1,f2", "", "f1,f2,x", "")
|
||||
|
||||
// needed fields intersect with src
|
||||
f("unpack_syslog from x", "f2,x", "", "f2,x", "")
|
||||
f("unpack_syslog if (y:z) from x", "f2,x", "", "f2,x,y", "")
|
||||
f("unpack_syslog if (f2:z y:qwe) from x", "f2,x", "", "f2,x,y", "")
|
||||
}
|
|
@ -64,7 +64,8 @@ func (f *Field) marshalToJSON(dst []byte) []byte {
|
|||
return dst
|
||||
}
|
||||
|
||||
func marshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
||||
// MarshalFieldsToJSON appends JSON-marshaled fields to dt and returns the result.
|
||||
func MarshalFieldsToJSON(dst []byte, fields []Field) []byte {
|
||||
dst = append(dst, '{')
|
||||
if len(fields) > 0 {
|
||||
dst = fields[0].marshalToJSON(dst)
|
||||
|
|
|
@ -99,7 +99,7 @@ func (sap *statsRowAnyProcessor) updateState(br *blockResult, rowIdx int) int {
|
|||
|
||||
func (sap *statsRowAnyProcessor) finalizeStats() string {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalFieldsToJSON(bb.B, sap.fields)
|
||||
bb.B = MarshalFieldsToJSON(bb.B, sap.fields)
|
||||
result := string(bb.B)
|
||||
bbPool.Put(bb)
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ func (smp *statsRowMaxProcessor) updateState(v string, br *blockResult, rowIdx i
|
|||
|
||||
func (smp *statsRowMaxProcessor) finalizeStats() string {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
|
||||
bb.B = MarshalFieldsToJSON(bb.B, smp.fields)
|
||||
result := string(bb.B)
|
||||
bbPool.Put(bb)
|
||||
|
||||
|
|
|
@ -206,7 +206,7 @@ func (smp *statsRowMinProcessor) updateState(v string, br *blockResult, rowIdx i
|
|||
|
||||
func (smp *statsRowMinProcessor) finalizeStats() string {
|
||||
bb := bbPool.Get()
|
||||
bb.B = marshalFieldsToJSON(bb.B, smp.fields)
|
||||
bb.B = MarshalFieldsToJSON(bb.B, smp.fields)
|
||||
result := string(bb.B)
|
||||
bbPool.Put(bb)
|
||||
|
||||
|
|
|
@ -969,7 +969,7 @@ func TestParseStreamFieldsSuccess(t *testing.T) {
|
|||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
result := marshalFieldsToJSON(nil, labels)
|
||||
result := MarshalFieldsToJSON(nil, labels)
|
||||
if string(result) != resultExpected {
|
||||
t.Fatalf("unexpected result\ngot\n%s\nwant\n%s", result, resultExpected)
|
||||
}
|
||||
|
|
308
lib/logstorage/syslog_parser.go
Normal file
308
lib/logstorage/syslog_parser.go
Normal file
|
@ -0,0 +1,308 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
)
|
||||
|
||||
func getSyslogParser(currentYear int) *syslogParser {
|
||||
v := syslogParserPool.Get()
|
||||
if v == nil {
|
||||
v = &syslogParser{}
|
||||
}
|
||||
p := v.(*syslogParser)
|
||||
p.currentYear = currentYear
|
||||
return p
|
||||
}
|
||||
|
||||
func putSyslogParser(p *syslogParser) {
|
||||
p.reset()
|
||||
syslogParserPool.Put(p)
|
||||
}
|
||||
|
||||
var syslogParserPool sync.Pool
|
||||
|
||||
type syslogParser struct {
|
||||
currentYear int
|
||||
|
||||
buf []byte
|
||||
fields []Field
|
||||
}
|
||||
|
||||
func (p *syslogParser) reset() {
|
||||
p.currentYear = 0
|
||||
p.resetFields()
|
||||
}
|
||||
|
||||
func (p *syslogParser) resetFields() {
|
||||
p.buf = p.buf[:0]
|
||||
|
||||
clear(p.fields)
|
||||
p.fields = p.fields[:0]
|
||||
}
|
||||
|
||||
func (p *syslogParser) addField(name, value string) {
|
||||
p.fields = append(p.fields, Field{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
}
|
||||
|
||||
func (p *syslogParser) parse(s string) {
|
||||
p.resetFields()
|
||||
|
||||
if len(s) == 0 {
|
||||
// Cannot parse syslog message
|
||||
return
|
||||
}
|
||||
|
||||
if s[0] != '<' {
|
||||
p.parseNoHeader(s)
|
||||
return
|
||||
}
|
||||
|
||||
// parse priority
|
||||
s = s[1:]
|
||||
n := strings.IndexByte(s, '>')
|
||||
if n < 0 {
|
||||
// Cannot parse priority
|
||||
return
|
||||
}
|
||||
priorityStr := s[:n]
|
||||
s = s[n+1:]
|
||||
|
||||
p.addField("priority", priorityStr)
|
||||
priority, ok := tryParseUint64(priorityStr)
|
||||
if !ok {
|
||||
// Cannot parse priority
|
||||
return
|
||||
}
|
||||
facility := priority / 8
|
||||
severity := priority % 8
|
||||
|
||||
bufLen := len(p.buf)
|
||||
p.buf = marshalUint64String(p.buf, facility)
|
||||
p.addField("facility", bytesutil.ToUnsafeString(p.buf[bufLen:]))
|
||||
|
||||
bufLen = len(p.buf)
|
||||
p.buf = marshalUint64String(p.buf, severity)
|
||||
p.addField("severity", bytesutil.ToUnsafeString(p.buf[bufLen:]))
|
||||
|
||||
p.parseNoHeader(s)
|
||||
}
|
||||
|
||||
func (p *syslogParser) parseNoHeader(s string) {
|
||||
if len(s) == 0 {
|
||||
return
|
||||
}
|
||||
if strings.HasPrefix(s, "1 ") {
|
||||
p.parseRFC5424(s[2:])
|
||||
} else {
|
||||
p.parseRFC3164(s)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *syslogParser) parseRFC5424(s string) {
|
||||
// See https://datatracker.ietf.org/doc/html/rfc5424
|
||||
|
||||
if len(s) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Parse timestamp
|
||||
n := strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("timestamp", s)
|
||||
return
|
||||
}
|
||||
p.addField("timestamp", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse hostname
|
||||
n = strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("hostname", s)
|
||||
return
|
||||
}
|
||||
p.addField("hostname", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse app-name
|
||||
n = strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("app_name", s)
|
||||
return
|
||||
}
|
||||
p.addField("app_name", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse procid
|
||||
n = strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("proc_id", s)
|
||||
return
|
||||
}
|
||||
p.addField("proc_id", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse msgID
|
||||
n = strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("msg_id", s)
|
||||
return
|
||||
}
|
||||
p.addField("msg_id", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse structured data
|
||||
tail, ok := p.parseRFC5424SD(s)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
s = tail
|
||||
|
||||
// Parse message
|
||||
p.addField("message", s)
|
||||
}
|
||||
|
||||
func (p *syslogParser) parseRFC5424SD(s string) (string, bool) {
|
||||
if strings.HasPrefix(s, "- ") {
|
||||
return s[2:], true
|
||||
}
|
||||
|
||||
for {
|
||||
tail, ok := p.parseRFC5424SDLine(s)
|
||||
if !ok {
|
||||
return tail, false
|
||||
}
|
||||
s = tail
|
||||
if strings.HasPrefix(s, " ") {
|
||||
s = s[1:]
|
||||
return s, true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *syslogParser) parseRFC5424SDLine(s string) (string, bool) {
|
||||
if len(s) == 0 || s[0] != '[' {
|
||||
return s, false
|
||||
}
|
||||
s = s[1:]
|
||||
|
||||
n := strings.IndexAny(s, " ]")
|
||||
if n < 0 {
|
||||
return s, false
|
||||
}
|
||||
sdID := s[:n]
|
||||
s = s[n:]
|
||||
|
||||
// Parse structured data
|
||||
i := 0
|
||||
for i < len(s) && s[i] != ']' {
|
||||
// skip whitespace
|
||||
if s[i] != ' ' {
|
||||
return s, false
|
||||
}
|
||||
i++
|
||||
|
||||
// Parse name
|
||||
n := strings.IndexByte(s[i:], '=')
|
||||
if n < 0 {
|
||||
return s, false
|
||||
}
|
||||
i += n + 1
|
||||
|
||||
// Parse value
|
||||
qp, err := strconv.QuotedPrefix(s[i:])
|
||||
if err != nil {
|
||||
return s, false
|
||||
}
|
||||
i += len(qp)
|
||||
}
|
||||
if i == len(s) {
|
||||
return s, false
|
||||
}
|
||||
|
||||
sdValue := strings.TrimSpace(s[:i])
|
||||
p.addField(sdID, sdValue)
|
||||
s = s[i+1:]
|
||||
return s, true
|
||||
}
|
||||
|
||||
func (p *syslogParser) parseRFC3164(s string) {
|
||||
// See https://datatracker.ietf.org/doc/html/rfc3164
|
||||
|
||||
// Parse timestamp
|
||||
n := len(time.Stamp)
|
||||
if len(s) < n {
|
||||
return
|
||||
}
|
||||
t, err := time.Parse(time.Stamp, s[:n])
|
||||
if err != nil {
|
||||
// TODO: fall back to parsing ISO8601 timestamp?
|
||||
return
|
||||
}
|
||||
s = s[n:]
|
||||
|
||||
t = t.UTC()
|
||||
t = time.Date(p.currentYear, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
||||
if uint64(t.Unix())-24*3600 > fasttime.UnixTimestamp() {
|
||||
// Adjust time to the previous year
|
||||
t = time.Date(t.Year()-1, t.Month(), t.Day(), t.Hour(), t.Minute(), t.Second(), t.Nanosecond(), time.UTC)
|
||||
}
|
||||
|
||||
bufLen := len(p.buf)
|
||||
p.buf = marshalTimestampISO8601String(p.buf, t.UnixNano())
|
||||
p.addField("timestamp", bytesutil.ToUnsafeString(p.buf[bufLen:]))
|
||||
|
||||
if len(s) == 0 || s[0] != ' ' {
|
||||
// Missing space after the time field
|
||||
return
|
||||
}
|
||||
s = s[1:]
|
||||
|
||||
// Parse hostname
|
||||
n = strings.IndexByte(s, ' ')
|
||||
if n < 0 {
|
||||
p.addField("hostname", s)
|
||||
return
|
||||
}
|
||||
p.addField("hostname", s[:n])
|
||||
s = s[n+1:]
|
||||
|
||||
// Parse tag (aka app_name)
|
||||
n = strings.IndexAny(s, "[: ")
|
||||
if n < 0 {
|
||||
p.addField("app_name", s)
|
||||
return
|
||||
}
|
||||
p.addField("app_name", s[:n])
|
||||
s = s[n:]
|
||||
|
||||
// Parse proc_id
|
||||
if len(s) == 0 {
|
||||
return
|
||||
}
|
||||
if s[0] == '[' {
|
||||
s = s[1:]
|
||||
n = strings.IndexByte(s, ']')
|
||||
if n < 0 {
|
||||
return
|
||||
}
|
||||
p.addField("proc_id", s[:n])
|
||||
s = s[n+1:]
|
||||
}
|
||||
|
||||
// Skip optional ': ' in front of message
|
||||
s = strings.TrimPrefix(s, ":")
|
||||
s = strings.TrimPrefix(s, " ")
|
||||
|
||||
if len(s) > 0 {
|
||||
p.addField("message", s)
|
||||
}
|
||||
}
|
67
lib/logstorage/syslog_parser_test.go
Normal file
67
lib/logstorage/syslog_parser_test.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSyslogParser(t *testing.T) {
|
||||
f := func(s, resultExpected string) {
|
||||
t.Helper()
|
||||
|
||||
const currentYear = 2024
|
||||
p := getSyslogParser(currentYear)
|
||||
defer putSyslogParser(p)
|
||||
|
||||
p.parse(s)
|
||||
result := MarshalFieldsToJSON(nil, p.fields)
|
||||
if string(result) != resultExpected {
|
||||
t.Fatalf("unexpected result when parsing [%s]; got\n%s\nwant\n%s\n", s, result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
// RFC 3164
|
||||
f("Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
|
||||
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
||||
f("<165>Jun 3 12:08:33 abcd systemd[1]: Starting Update the local ESM caches...",
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"systemd","proc_id":"1","message":"Starting Update the local ESM caches..."}`)
|
||||
f("Mar 13 12:08:33 abcd systemd: Starting Update the local ESM caches...",
|
||||
`{"timestamp":"2024-03-13T12:08:33.000Z","hostname":"abcd","app_name":"systemd","message":"Starting Update the local ESM caches..."}`)
|
||||
f("Jun 3 12:08:33 abcd - Starting Update the local ESM caches...",
|
||||
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
||||
f("Jun 3 12:08:33 - - Starting Update the local ESM caches...",
|
||||
`{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"-","app_name":"-","message":"Starting Update the local ESM caches..."}`)
|
||||
|
||||
// RFC 5424
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
||||
f(`1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 - This is a test message with structured data.`,
|
||||
`{"timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","message":"This is a test message with structured data."}`)
|
||||
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [exampleSDID@32473 iut="3" eventSource="Application 123 = ] 56" eventID="11211"] This is a test message with structured data.`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","exampleSDID@32473":"iut=\"3\" eventSource=\"Application 123 = ] 56\" eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
||||
f(`<165>1 2023-06-03T17:42:00.000Z mymachine.example.com appname 12345 ID47 [foo@123 iut="3"][bar@456 eventID="11211"] This is a test message with structured data.`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:00.000Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":"iut=\"3\"","bar@456":"eventID=\"11211\"","message":"This is a test message with structured data."}`)
|
||||
|
||||
// Incomplete RFC 3164
|
||||
f("", `{}`)
|
||||
f("Jun 3 12:08:33", `{"timestamp":"2024-06-03T12:08:33.000Z"}`)
|
||||
f("Jun 3 12:08:33 abcd", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd"}`)
|
||||
f("Jun 3 12:08:33 abcd sudo", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo"}`)
|
||||
f("Jun 3 12:08:33 abcd sudo[123]", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","proc_id":"123"}`)
|
||||
f("Jun 3 12:08:33 abcd sudo foobar", `{"timestamp":"2024-06-03T12:08:33.000Z","hostname":"abcd","app_name":"sudo","message":"foobar"}`)
|
||||
|
||||
// Incomplete RFC 5424
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47 [foo@123]`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47","foo@123":""}`)
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345 ID47`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345","msg_id":"ID47"}`)
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname 12345`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname","proc_id":"12345"}`)
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com appname`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com","app_name":"appname"}`)
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z mymachine.example.com`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z","hostname":"mymachine.example.com"}`)
|
||||
f(`<165>1 2023-06-03T17:42:32.123456789Z`,
|
||||
`{"priority":"165","facility":"20","severity":"5","timestamp":"2023-06-03T17:42:32.123456789Z"}`)
|
||||
f(`<165>1 `,
|
||||
`{"priority":"165","facility":"20","severity":"5"}`)
|
||||
}
|
|
@ -860,8 +860,8 @@ func tryParseDuration(s string) (int64, bool) {
|
|||
return nsecs, true
|
||||
}
|
||||
|
||||
// marshalDuration appends string representation of nsec duration to dst and returns the result.
|
||||
func marshalDuration(dst []byte, nsecs int64) []byte {
|
||||
// marshalDurationString appends string representation of nsec duration to dst and returns the result.
|
||||
func marshalDurationString(dst []byte, nsecs int64) []byte {
|
||||
if nsecs == 0 {
|
||||
return append(dst, '0')
|
||||
}
|
||||
|
|
|
@ -378,11 +378,11 @@ func TestTryParseDuration_Failure(t *testing.T) {
|
|||
f("2s 3ms")
|
||||
}
|
||||
|
||||
func TestMarshalDuration(t *testing.T) {
|
||||
func TestMarshalDurationString(t *testing.T) {
|
||||
f := func(nsecs int64, resultExpected string) {
|
||||
t.Helper()
|
||||
|
||||
result := marshalDuration(nil, nsecs)
|
||||
result := marshalDurationString(nil, nsecs)
|
||||
if string(result) != resultExpected {
|
||||
t.Fatalf("unexpected result; got %q; want %q", result, resultExpected)
|
||||
}
|
||||
|
|
|
@ -14,12 +14,12 @@ import (
|
|||
//
|
||||
// It returns unix timestamp in milliseconds.
|
||||
func ParseTimeMsec(s string) (int64, error) {
|
||||
currentTimestamp := float64(time.Now().UnixNano()) / 1e9
|
||||
secs, err := ParseTimeAt(s, currentTimestamp)
|
||||
currentTimestamp := time.Now().UnixNano()
|
||||
nsecs, err := ParseTimeAt(s, currentTimestamp)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
msecs := int64(math.Round(secs * 1000))
|
||||
msecs := int64(math.Round(float64(nsecs) / 1e6))
|
||||
return msecs, nil
|
||||
}
|
||||
|
||||
|
@ -33,13 +33,13 @@ const (
|
|||
//
|
||||
// See https://docs.victoriametrics.com/single-server-victoriametrics/#timestamp-formats
|
||||
//
|
||||
// It returns unix timestamp in seconds.
|
||||
func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
||||
// It returns unix timestamp in nanoseconds.
|
||||
func ParseTimeAt(s string, currentTimestamp int64) (int64, error) {
|
||||
if s == "now" {
|
||||
return currentTimestamp, nil
|
||||
}
|
||||
sOrig := s
|
||||
tzOffset := float64(0)
|
||||
tzOffset := int64(0)
|
||||
if len(sOrig) > 6 {
|
||||
// Try parsing timezone offset
|
||||
tz := sOrig[len(sOrig)-6:]
|
||||
|
@ -53,7 +53,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse minute from timezone offset %q: %w", tz, err)
|
||||
}
|
||||
tzOffset = float64(hour*3600 + minute*60)
|
||||
tzOffset = int64(hour*3600+minute*60) * 1e9
|
||||
if isPlus {
|
||||
tzOffset = -tzOffset
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if d > 0 {
|
||||
d = -d
|
||||
}
|
||||
return currentTimestamp + float64(d)/1e9, nil
|
||||
return currentTimestamp + int64(d), nil
|
||||
}
|
||||
if len(s) == 4 {
|
||||
// Parse YYYY
|
||||
|
@ -83,7 +83,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if y > maxValidYear || y < minValidYear {
|
||||
return 0, fmt.Errorf("cannot parse year from %q: year must in range [%d, %d]", s, minValidYear, maxValidYear)
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
if !strings.Contains(sOrig, "-") {
|
||||
// Parse the timestamp in seconds or in milliseconds
|
||||
|
@ -95,7 +95,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
// The timestamp is in milliseconds. Convert it to seconds.
|
||||
ts /= 1000
|
||||
}
|
||||
return ts, nil
|
||||
return int64(math.Round(ts*1e3)) * 1e6, nil
|
||||
}
|
||||
if len(s) == 7 {
|
||||
// Parse YYYY-MM
|
||||
|
@ -103,7 +103,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
if len(s) == 10 {
|
||||
// Parse YYYY-MM-DD
|
||||
|
@ -111,7 +111,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
if len(s) == 13 {
|
||||
// Parse YYYY-MM-DDTHH
|
||||
|
@ -119,7 +119,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
if len(s) == 16 {
|
||||
// Parse YYYY-MM-DDTHH:MM
|
||||
|
@ -127,7 +127,7 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
if len(s) == 19 {
|
||||
// Parse YYYY-MM-DDTHH:MM:SS
|
||||
|
@ -135,12 +135,12 @@ func ParseTimeAt(s string, currentTimestamp float64) (float64, error) {
|
|||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return tzOffset + float64(t.UnixNano())/1e9, nil
|
||||
return tzOffset + t.UnixNano(), nil
|
||||
}
|
||||
// Parse RFC3339
|
||||
t, err := time.Parse(time.RFC3339, sOrig)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return float64(t.UnixNano()) / 1e9, nil
|
||||
return t.UnixNano(), nil
|
||||
}
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
)
|
||||
|
||||
func TestParseTimeAtSuccess(t *testing.T) {
|
||||
f := func(s string, currentTime, resultExpected float64) {
|
||||
f := func(s string, currentTime, resultExpected int64) {
|
||||
t.Helper()
|
||||
result, err := ParseTimeAt(s, currentTime)
|
||||
if err != nil {
|
||||
|
@ -17,65 +17,65 @@ func TestParseTimeAtSuccess(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
now := float64(time.Now().UnixNano()) / 1e9
|
||||
now := time.Now().UnixNano()
|
||||
|
||||
// unix timestamp in seconds
|
||||
f("1562529662", now, 1562529662)
|
||||
f("1562529662.678", now, 1562529662.678)
|
||||
f("1562529662", now, 1562529662*1e9)
|
||||
f("1562529662.678", now, 1562529662678*1e6)
|
||||
|
||||
// unix timestamp in milliseconds
|
||||
f("1562529662678", now, 1562529662.678)
|
||||
f("1562529662678", now, 1562529662678*1e6)
|
||||
|
||||
// duration relative to the current time
|
||||
f("now", now, now)
|
||||
f("1h5s", now, now-3605)
|
||||
f("1h5s", now, now-3605*1e9)
|
||||
|
||||
// negative duration relative to the current time
|
||||
f("-5m", now, now-5*60)
|
||||
f("-123", now, now-123)
|
||||
f("-123.456", now, now-123.456)
|
||||
f("now-1h5m", now, now-(3600+5*60))
|
||||
f("-5m", now, now-5*60*1e9)
|
||||
f("-123", now, now-123*1e9)
|
||||
f("-123.456", now, now-123456*1e6)
|
||||
f("now-1h5m", now, now-(3600+5*60)*1e9)
|
||||
|
||||
// Year
|
||||
f("2023", now, 1.6725312e+09)
|
||||
f("2023Z", now, 1.6725312e+09)
|
||||
f("2023+02:00", now, 1.672524e+09)
|
||||
f("2023-02:00", now, 1.6725384e+09)
|
||||
f("2023", now, 1.6725312e+09*1e9)
|
||||
f("2023Z", now, 1.6725312e+09*1e9)
|
||||
f("2023+02:00", now, 1.672524e+09*1e9)
|
||||
f("2023-02:00", now, 1.6725384e+09*1e9)
|
||||
|
||||
// Year and month
|
||||
f("2023-05", now, 1.6828992e+09)
|
||||
f("2023-05Z", now, 1.6828992e+09)
|
||||
f("2023-05+02:00", now, 1.682892e+09)
|
||||
f("2023-05-02:00", now, 1.6829064e+09)
|
||||
f("2023-05", now, 1.6828992e+09*1e9)
|
||||
f("2023-05Z", now, 1.6828992e+09*1e9)
|
||||
f("2023-05+02:00", now, 1.682892e+09*1e9)
|
||||
f("2023-05-02:00", now, 1.6829064e+09*1e9)
|
||||
|
||||
// Year, month and day
|
||||
f("2023-05-20", now, 1.6845408e+09)
|
||||
f("2023-05-20Z", now, 1.6845408e+09)
|
||||
f("2023-05-20+02:30", now, 1.6845318e+09)
|
||||
f("2023-05-20-02:30", now, 1.6845498e+09)
|
||||
f("2023-05-20", now, 1.6845408e+09*1e9)
|
||||
f("2023-05-20Z", now, 1.6845408e+09*1e9)
|
||||
f("2023-05-20+02:30", now, 1.6845318e+09*1e9)
|
||||
f("2023-05-20-02:30", now, 1.6845498e+09*1e9)
|
||||
|
||||
// Year, month, day and hour
|
||||
f("2023-05-20T04", now, 1.6845552e+09)
|
||||
f("2023-05-20T04Z", now, 1.6845552e+09)
|
||||
f("2023-05-20T04+02:30", now, 1.6845462e+09)
|
||||
f("2023-05-20T04-02:30", now, 1.6845642e+09)
|
||||
f("2023-05-20T04", now, 1.6845552e+09*1e9)
|
||||
f("2023-05-20T04Z", now, 1.6845552e+09*1e9)
|
||||
f("2023-05-20T04+02:30", now, 1.6845462e+09*1e9)
|
||||
f("2023-05-20T04-02:30", now, 1.6845642e+09*1e9)
|
||||
|
||||
// Year, month, day, hour and minute
|
||||
f("2023-05-20T04:57", now, 1.68455862e+09)
|
||||
f("2023-05-20T04:57Z", now, 1.68455862e+09)
|
||||
f("2023-05-20T04:57+02:30", now, 1.68454962e+09)
|
||||
f("2023-05-20T04:57-02:30", now, 1.68456762e+09)
|
||||
f("2023-05-20T04:57", now, 1.68455862e+09*1e9)
|
||||
f("2023-05-20T04:57Z", now, 1.68455862e+09*1e9)
|
||||
f("2023-05-20T04:57+02:30", now, 1.68454962e+09*1e9)
|
||||
f("2023-05-20T04:57-02:30", now, 1.68456762e+09*1e9)
|
||||
|
||||
// Year, month, day, hour, minute and second
|
||||
f("2023-05-20T04:57:43", now, 1.684558663e+09)
|
||||
f("2023-05-20T04:57:43Z", now, 1.684558663e+09)
|
||||
f("2023-05-20T04:57:43+02:30", now, 1.684549663e+09)
|
||||
f("2023-05-20T04:57:43-02:30", now, 1.684567663e+09)
|
||||
f("2023-05-20T04:57:43", now, 1.684558663e+09*1e9)
|
||||
f("2023-05-20T04:57:43Z", now, 1.684558663e+09*1e9)
|
||||
f("2023-05-20T04:57:43+02:30", now, 1.684549663e+09*1e9)
|
||||
f("2023-05-20T04:57:43-02:30", now, 1.684567663e+09*1e9)
|
||||
|
||||
// milliseconds
|
||||
f("2023-05-20T04:57:43.123Z", now, 1.6845586631230001e+09)
|
||||
f("2023-05-20T04:57:43.123456789+02:30", now, 1.6845496631234567e+09)
|
||||
f("2023-05-20T04:57:43.123456789-02:30", now, 1.6845676631234567e+09)
|
||||
f("2023-05-20T04:57:43.123Z", now, 1684558663123000000)
|
||||
f("2023-05-20T04:57:43.123456789+02:30", now, 1684549663123456789)
|
||||
f("2023-05-20T04:57:43.123456789-02:30", now, 1684567663123456789)
|
||||
}
|
||||
|
||||
func TestParseTimeMsecFailure(t *testing.T) {
|
||||
|
|
Loading…
Reference in a new issue