lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-06 12:27:05 +02:00
parent d54d5a17de
commit 55d8379ae6
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
19 changed files with 924 additions and 17 deletions

View file

@ -43,7 +43,7 @@ services:
# storing logs and serving read queries. # storing logs and serving read queries.
victorialogs: victorialogs:
container_name: victorialogs container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
command: command:
- "--storageDataPath=/vlogs" - "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428" - "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066 - -beat.uri=http://filebeat-victorialogs:5066
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-docker-vl:/vlogs - victorialogs-filebeat-docker-vl:/vlogs
ports: ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-filebeat-syslog-vl:/vlogs - victorialogs-filebeat-syslog-vl:/vlogs
ports: ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-fluentbit-vl:/vlogs - victorialogs-fluentbit-vl:/vlogs
ports: ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140" - "5140:5140"
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-logstash-vl:/vlogs - victorialogs-logstash-vl:/vlogs
ports: ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140" - "5140:5140"
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-promtail-docker:/vlogs - victorialogs-promtail-docker:/vlogs
ports: ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy condition: service_healthy
victorialogs: victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- victorialogs-vector-docker-vl:/vlogs - victorialogs-vector-docker-vl:/vlogs
ports: ports:

View file

@ -3,7 +3,7 @@ version: '3'
services: services:
# Run `make package-victoria-logs` to build victoria-logs image # Run `make package-victoria-logs` to build victoria-logs image
vlogs: vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs image: docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
volumes: volumes:
- vlogs:/vlogs - vlogs:/vlogs
ports: ports:

View file

@ -19,7 +19,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip ## tip
## [v0.18.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.18.0-victorialogs)
Released at 2024-06-06
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve displaying of logs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6419) and the following issues: [6408](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6408), [6405](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6405), [6406](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6406) and [6407](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6407). * FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): improve displaying of logs. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6419) and the following issues: [6408](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6408), [6405](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6405), [6406](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6406) and [6407](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6407).
* FEATURE: add support for [day range filter](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) and [week range filter](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter). These filters allow selecting logs on a particular time range per every day or on a particular day of the week.
* FEATURE: allow using `eval` instead of `math` keyword in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe). * FEATURE: allow using `eval` instead of `math` keyword in [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe).
## [v0.17.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.17.0-victorialogs) ## [v0.17.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.17.0-victorialogs)

View file

@ -254,6 +254,8 @@ If doubt, it is recommended quoting field names and filter args.
The list of LogsQL filters: The list of LogsQL filters:
- [Time filter](#time-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in the given time range - [Time filter](#time-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in the given time range
- [Day range filter](#day-range-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in the given per-day time range
- [Week range filter](#week-range-filter) - matches logs with [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) in the given per-week day range
- [Stream filter](#stream-filter) - matches logs, which belong to the given [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) - [Stream filter](#stream-filter) - matches logs, which belong to the given [streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
- [Word filter](#word-filter) - matches logs with the given [word](#word) - [Word filter](#word-filter) - matches logs with the given [word](#word)
- [Phrase filter](#phrase-filter) - matches logs with the given phrase - [Phrase filter](#phrase-filter) - matches logs with the given phrase
@ -330,9 +332,99 @@ Performance tips:
See also: See also:
- [Day range filter](#day-range-filter)
- [Week range filter](#week-range-filter)
- [Stream filter](#stream-filter) - [Stream filter](#stream-filter)
- [Word filter](#word-filter) - [Word filter](#word-filter)
### Day range filter
`_time:day_range[start, end]` filter allows returning logs on the particular `start ... end` time per every day, where `start` and `end` have the format `hh:mm`.
For example, the following query matches logs between `08:00` and `18:00` UTC every day:
```logsql
_time:day_range[08:00, 18:00)
```
This query includes `08:00`, while `18:00` is excluded, e.g. the last matching time is `17:59:59.999999999`.
Replace `[` with `(` in order to exclude the starting time. Replace `)` with `]` in order to include the ending time.
For example, the following query matches logs between `08:00` and `18:00`, excluding `08:00:00.000000000` and including `18:00`:
```logsql
_time:day_range(08:00, 18:00]
```
If the time range must be applied to other than UTC time zone, then add `offset <duration>`, where `<duration>` can have [any supported duration value](#duration-values).
For example, the following query selects logs between `08:00` and `18:00` at `+0200` time zone:
```logsql
_time:day_range[08:00, 18:00) offset 2h
```
Performance tip: it is recommended specifying regular [time filter](#time-filter) additionally to `day_range` filter. For example, the following query selects logs
between `08:00` and `20:00` every day for the last week:
```logsql
_time:1w _time:day_range[08:00, 18:00)
```
See also:
- [Week range filter](#week-range-filter)
- [Time filter](#time-filter)
### Week range filter
`_time:week_range[start, end]` filter allows returning logs on the particular `start ... end` days per every day, where `start` and `end` can have the following values:
- `Sun` or `Sunday`
- `Mon` or `Monday`
- `Tue` or `Tuesday`
- `Wed` or `Wednesday`
- `Thu` or `Thusday`
- `Fri` or `Friday`
- `Sat` or `Saturday`
For example, the following query matches logs between Monday and Friday UTC every day:
```logsql
_time:week_range[Mon, Fri]
```
This query includes Monday and Friday.
Replace `[` with `(` in order to exclude the starting day. Replace `]` with `)` in order to exclude the ending day.
For example, the following query matches logs between Sunday and Saturday, excluding Sunday and Saturday (e.g. it is equivalent to the previous query):
```logsql
_time:week_range(Sun, Sat)
```
If the day range must be applied to other than UTC time zone, then add `offset <duration>`, where `<duration>` can have [any supported duration value](#duration-values).
For example, the following query selects logs between Monday and Friday at `+0200` time zone:
```logsql
_time:week_range[Mon, Fri] offset 2h
```
The `week_range` filter can be combined with [`day_range` filter](#day-range-filter) using [logical filters](#logical-filter). For example, the following query
selects logs between `08:00` and `18:00` every day of the week excluding Sunday and Saturday:
```logsql
_time:week_range[Mon, Fri] _time:day_range[08:00, 18:00)
```
Performance tip: it is recommended specifying regular [time filter](#time-filter) additionally to `week_range` filter. For example, the following query selects logs
between Monday and Friday per every week for the last 4 weeks:
```logsql
_time:4w _time:week_range[Mon, Fri]
```
See also:
- [Day range filter](#day-range-filter)
- [Time filter](#time-filter)
### Stream filter ### Stream filter
VictoriaLogs provides an optimized way to select log entries, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). VictoriaLogs provides an optimized way to select log entries, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).

View file

@ -34,8 +34,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it: For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh ```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.17.0-victorialogs/victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.18.0-victorialogs/victoria-logs-linux-amd64-v0.18.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.17.0-victorialogs.tar.gz tar xzf victoria-logs-linux-amd64-v0.18.0-victorialogs.tar.gz
./victoria-logs-prod ./victoria-logs-prod
``` ```
@ -59,7 +59,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh ```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \ docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.17.0-victorialogs docker.io/victoriametrics/victoria-logs:v0.18.0-victorialogs
``` ```
See also: See also:

View file

@ -346,6 +346,28 @@ _time:1d error | stats by (_time:1h) count() rows | sort by (_time)
This query uses [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) in order to sort per-hour stats This query uses [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) in order to sort per-hour stats
by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field). by [`_time`](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).
## How to calculate the number of logs per IPv4 subnetwork?
Use [`stats` by IPv4 bucket](https://docs.victoriametrics.com/victorialogs/logsql/#stats-by-ipv4-buckets). For example, the following
query returns top 10 `/24` subnetworks with the biggest number of logs for the last 5 minutes:
```logsql
_time:5m | stats by (ip:/24) count() rows | sort by (rows desc) limit 10
```
This query uses [`sort` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#sort-pipe) in order to sort per-subnetwork stats
by descending number of rows and limiting the result to top 10 rows.
The query assumes the original logs have `ip` [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) with the IPv4 address.
If the IPv4 address is located inside [log message](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) or any other text field,
then it can be extracted with the [`extract`](https://docs.victoriametrics.com/victorialogs/logsql/#extract-pipe)
or [`extract_regexp`](https://docs.victoriametrics.com/victorialogs/logsql/#extract_regexp-pipe) pipes. For example, the following query
extracts IPv4 address from [`_msg` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#message-field) and then returns top 10
`/16` subnetworks with the biggest number of logs for the last 5 minutes:
```logsql
_time:5m | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)" | stats by (ip:/16) count() rows | sort by (rows desc) limit 10
```
## How to calculate the number of logs per every value of the given field? ## How to calculate the number of logs per every value of the given field?
@ -421,3 +443,16 @@ This query uses the following [LogsQL](https://docs.victoriametrics.com/victoria
for calculating the total number of logs and the number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) on the selected time range. for calculating the total number of logs and the number of logs with the `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) on the selected time range.
- [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for calculating the share of logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word) - [`math` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#math-pipe) for calculating the share of logs with `error` [word](https://docs.victoriametrics.com/victorialogs/logsql/#word)
comparing to the total number of logs. comparing to the total number of logs.
## How to select logs for working hours and weekdays?
Use [`day_range`](https://docs.victoriametrics.com/victorialogs/logsql/#day-range-filter) and [`week_range`](https://docs.victoriametrics.com/victorialogs/logsql/#week-range-filter) filters.
For example, the following query selects logs from Monday to Friday in working hours `[08:00 - 18:00]` over the last 4 weeks:
```logsql
_time:4w _time:week_range[Mon, Fri] _time:day_range[08:00, 18:00)
```
It uses implicit [`AND` logical filtere](https://docs.victoriametrics.com/victorialogs/logsql/#logical-filter) for joining multiple filters
on [`_time` field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field).

View file

@ -0,0 +1,135 @@
package logstorage
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// filterDayRange filters by day range.
//
// It is expressed as `_time:day_range[start, end] offset d` in LogsQL.
type filterDayRange struct {
// start is the offset in nanoseconds from the beginning of the day for the day range start.
start int64
// end is the offset in nanoseconds from the beginning of the day for the day range end.
end int64
// offset is the offset, which must be applied to _time before applying [start, end] filter to it.
offset int64
// stringRepr is string representation of the filter.
stringRepr string
}
func (fr *filterDayRange) String() string {
return "_time:day_range" + fr.stringRepr
}
func (fr *filterDayRange) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_time")
}
func (fr *filterDayRange) applyToBlockResult(br *blockResult, bm *bitmap) {
if fr.start > fr.end {
bm.resetBits()
return
}
if fr.start == 0 && fr.end == nsecsPerDay-1 {
return
}
c := br.getColumnByName("_time")
if c.isConst {
v := c.valuesEncoded[0]
if !fr.matchTimestampString(v) {
bm.resetBits()
}
return
}
if c.isTime {
timestamps := br.timestamps
bm.forEachSetBit(func(idx int) bool {
timestamp := timestamps[idx]
return fr.matchTimestampValue(timestamp)
})
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return fr.matchTimestampString(v)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if fr.matchTimestampString(v) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
bm.resetBits()
case valueTypeTimestampISO8601:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
timestamp := unmarshalTimestampISO8601(v)
return fr.matchTimestampValue(timestamp)
})
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fr *filterDayRange) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v)
if !ok {
return false
}
return fr.matchTimestampValue(timestamp)
}
func (fr *filterDayRange) matchTimestampValue(timestamp int64) bool {
dayOffset := fr.dayRangeOffset(timestamp)
return dayOffset >= fr.start && dayOffset <= fr.end
}
func (fr *filterDayRange) dayRangeOffset(timestamp int64) int64 {
timestamp += fr.offset
return timestamp % nsecsPerDay
}
func (fr *filterDayRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
if fr.start > fr.end {
bm.resetBits()
return
}
if fr.start == 0 && fr.end == nsecsPerDay-1 {
return
}
timestamps := bs.getTimestamps()
bm.forEachSetBit(func(idx int) bool {
return fr.matchTimestampValue(timestamps[idx])
})
}

View file

@ -0,0 +1,105 @@
package logstorage
import (
"testing"
)
func TestFilterDayRange(t *testing.T) {
timestamps := []int64{
1,
9,
123,
456,
789,
}
// match
ft := &filterDayRange{
start: 0,
end: 1,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterDayRange{
start: 0,
end: 10,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0, 1})
ft = &filterDayRange{
start: 1,
end: 1,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterDayRange{
start: 1,
end: 1,
offset: 9,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
ft = &filterDayRange{
start: 10,
end: 10,
offset: -9,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
ft = &filterDayRange{
start: 2,
end: 456,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3})
ft = &filterDayRange{
start: 2,
end: 457,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1, 2, 3})
ft = &filterDayRange{
start: 120,
end: 788,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3})
ft = &filterDayRange{
start: 120,
end: 789,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4})
ft = &filterDayRange{
start: 120,
end: 10000,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2, 3, 4})
ft = &filterDayRange{
start: 789,
end: 1000,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{4})
// mismatch
ft = &filterDayRange{
start: 1,
end: 1,
offset: 10,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterDayRange{
start: 0,
end: 1000,
offset: 10_000,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterDayRange{
start: 790,
end: 1000,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
}

View file

@ -27,10 +27,7 @@ func (ft *filterTime) updateNeededFields(neededFields fieldsSet) {
} }
func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) { func (ft *filterTime) applyToBlockResult(br *blockResult, bm *bitmap) {
minTimestamp := ft.minTimestamp if ft.minTimestamp > ft.maxTimestamp {
maxTimestamp := ft.maxTimestamp
if minTimestamp > maxTimestamp {
bm.resetBits() bm.resetBits()
return return
} }

View file

@ -0,0 +1,137 @@
package logstorage
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"time"
)
// filterWeekRange filters by week range.
//
// It is expressed as `_time:week_range[start, end] offset d` in LogsQL.
type filterWeekRange struct {
// startDay is the starting day of the week.
startDay time.Weekday
// endDay is the ending day of the week.
endDay time.Weekday
// offset is the offset, which must be applied to _time before applying [start, end] filter to it.
offset int64
// stringRepr is string representation of the filter.
stringRepr string
}
func (fr *filterWeekRange) String() string {
return "_time:week_range" + fr.stringRepr
}
func (fr *filterWeekRange) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_time")
}
func (fr *filterWeekRange) applyToBlockResult(br *blockResult, bm *bitmap) {
if fr.startDay > fr.endDay || fr.startDay > time.Saturday || fr.endDay < time.Monday {
bm.resetBits()
return
}
if fr.startDay <= time.Sunday && fr.endDay >= time.Saturday {
return
}
c := br.getColumnByName("_time")
if c.isConst {
v := c.valuesEncoded[0]
if !fr.matchTimestampString(v) {
bm.resetBits()
}
return
}
if c.isTime {
timestamps := br.timestamps
bm.forEachSetBit(func(idx int) bool {
timestamp := timestamps[idx]
return fr.matchTimestampValue(timestamp)
})
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return fr.matchTimestampString(v)
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if fr.matchTimestampString(v) {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
bm.resetBits()
case valueTypeTimestampISO8601:
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
v := valuesEncoded[idx]
timestamp := unmarshalTimestampISO8601(v)
return fr.matchTimestampValue(timestamp)
})
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fr *filterWeekRange) matchTimestampString(v string) bool {
timestamp, ok := tryParseTimestampRFC3339Nano(v)
if !ok {
return false
}
return fr.matchTimestampValue(timestamp)
}
func (fr *filterWeekRange) matchTimestampValue(timestamp int64) bool {
d := fr.weekday(timestamp)
return d >= fr.startDay && d <= fr.endDay
}
func (fr *filterWeekRange) weekday(timestamp int64) time.Weekday {
timestamp += fr.offset
return time.Unix(0, timestamp).UTC().Weekday()
}
func (fr *filterWeekRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
if fr.startDay > fr.endDay || fr.startDay > time.Saturday || fr.endDay < time.Monday {
bm.resetBits()
return
}
if fr.startDay <= time.Sunday && fr.endDay >= time.Saturday {
return
}
timestamps := bs.getTimestamps()
bm.forEachSetBit(func(idx int) bool {
return fr.matchTimestampValue(timestamps[idx])
})
}

View file

@ -0,0 +1,76 @@
package logstorage
import (
"testing"
"time"
)
func TestFilterWeekRange(t *testing.T) {
timestamps := []int64{
0,
1 * nsecsPerDay,
2 * nsecsPerDay,
4 * nsecsPerDay,
6 * nsecsPerDay,
}
// match
ft := &filterWeekRange{
startDay: time.Sunday,
endDay: time.Sunday,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterWeekRange{
startDay: time.Sunday,
endDay: time.Monday,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0, 1})
ft = &filterWeekRange{
startDay: time.Monday,
endDay: time.Monday,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
ft = &filterWeekRange{
startDay: time.Monday,
endDay: time.Monday,
offset: 2 * nsecsPerDay,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2})
ft = &filterWeekRange{
startDay: time.Monday,
endDay: time.Monday,
offset: -2 * nsecsPerDay,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
ft = &filterWeekRange{
startDay: time.Sunday,
endDay: time.Saturday,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{0, 1, 2, 3, 4})
// mismatch
ft = &filterWeekRange{
startDay: time.Friday,
endDay: time.Friday,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterWeekRange{
startDay: time.Thursday,
endDay: time.Thursday,
offset: 2 * nsecsPerHour,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterWeekRange{
startDay: time.Saturday,
endDay: time.Saturday,
offset: -2 * nsecsPerHour,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
}

View file

@ -764,7 +764,7 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error)
} }
switch fieldName { switch fieldName {
case "_time": case "_time":
return parseFilterTimeWithOffset(lex) return parseFilterTimeGeneric(lex)
case "_stream": case "_stream":
return parseFilterStream(lex) return parseFilterStream(lex)
default: default:
@ -1375,7 +1375,228 @@ func startsWithYear(s string) bool {
return c == '-' || c == '+' || c == 'Z' || c == 'z' return c == '-' || c == '+' || c == 'Z' || c == 'z'
} }
func parseFilterTimeWithOffset(lex *lexer) (*filterTime, error) { func parseFilterTimeGeneric(lex *lexer) (filter, error) {
switch {
case lex.isKeyword("day_range"):
return parseFilterDayRange(lex)
case lex.isKeyword("week_range"):
return parseFilterWeekRange(lex)
default:
return parseFilterTimeRange(lex)
}
}
func parseFilterDayRange(lex *lexer) (*filterDayRange, error) {
if !lex.isKeyword("day_range") {
return nil, fmt.Errorf("unexpected token %q; want 'day_range'", lex.token)
}
lex.nextToken()
startBrace := "["
switch {
case lex.isKeyword("["):
lex.nextToken()
case lex.isKeyword("("):
lex.nextToken()
startBrace = "("
default:
return nil, fmt.Errorf("missing '[' or '(' at day_range filter")
}
start, startStr, err := getDayRangeArg(lex)
if err != nil {
return nil, fmt.Errorf("cannot read `start` arg at day_range filter: %w", err)
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q; want ','", lex.token)
}
lex.nextToken()
end, endStr, err := getDayRangeArg(lex)
if err != nil {
return nil, fmt.Errorf("cannot read `end` arg at day_range filter: %w", err)
}
endBrace := "]"
switch {
case lex.isKeyword("]"):
lex.nextToken()
case lex.isKeyword(")"):
lex.nextToken()
endBrace = ")"
default:
return nil, fmt.Errorf("missing ']' or ')' after day_range filter")
}
offset := int64(0)
offsetStr := ""
if lex.isKeyword("offset") {
lex.nextToken()
s, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse offset in day_range filter: %w", err)
}
d, ok := tryParseDuration(s)
if !ok {
return nil, fmt.Errorf("cannot parse offset %q for day_range filter", s)
}
offset = int64(d)
offsetStr = " offset " + s
}
if startBrace == "(" {
start++
}
if endBrace == ")" {
end--
}
fr := &filterDayRange{
start: start,
end: end,
offset: offset,
stringRepr: fmt.Sprintf("%s%s, %s%s%s", startBrace, startStr, endStr, endBrace, offsetStr),
}
return fr, nil
}
func parseFilterWeekRange(lex *lexer) (*filterWeekRange, error) {
if !lex.isKeyword("week_range") {
return nil, fmt.Errorf("unexpected token %q; want 'week_range'", lex.token)
}
lex.nextToken()
startBrace := "["
switch {
case lex.isKeyword("["):
lex.nextToken()
case lex.isKeyword("("):
lex.nextToken()
startBrace = "("
default:
return nil, fmt.Errorf("missing '[' or '(' at week_range filter")
}
startDay, startStr, err := getWeekRangeArg(lex)
if err != nil {
return nil, fmt.Errorf("cannot read `start` arg at week_range filter: %w", err)
}
if !lex.isKeyword(",") {
return nil, fmt.Errorf("unexpected token %q; want ','", lex.token)
}
lex.nextToken()
endDay, endStr, err := getWeekRangeArg(lex)
if err != nil {
return nil, fmt.Errorf("cannot read `end` arg at week_range filter: %w", err)
}
endBrace := "]"
switch {
case lex.isKeyword("]"):
lex.nextToken()
case lex.isKeyword(")"):
lex.nextToken()
endBrace = ")"
default:
return nil, fmt.Errorf("missing ']' or ')' after week_range filter")
}
offset := int64(0)
offsetStr := ""
if lex.isKeyword("offset") {
lex.nextToken()
s, err := getCompoundToken(lex)
if err != nil {
return nil, fmt.Errorf("cannot parse offset in week_range filter: %w", err)
}
d, ok := tryParseDuration(s)
if !ok {
return nil, fmt.Errorf("cannot parse offset %q for week_range filter", s)
}
offset = int64(d)
offsetStr = " offset " + s
}
if startBrace == "(" {
startDay++
}
if endBrace == ")" {
endDay--
}
fr := &filterWeekRange{
startDay: startDay,
endDay: endDay,
offset: offset,
stringRepr: fmt.Sprintf("%s%s, %s%s%s", startBrace, startStr, endStr, endBrace, offsetStr),
}
return fr, nil
}
func getDayRangeArg(lex *lexer) (int64, string, error) {
argStr, err := getCompoundToken(lex)
if err != nil {
return 0, "", err
}
n := strings.IndexByte(argStr, ':')
if n < 0 {
return 0, "", fmt.Errorf("invalid format for day_range arg; want 'hh:mm'; got %q", argStr)
}
hoursStr := argStr[:n]
minutesStr := argStr[n+1:]
hours, ok := tryParseUint64(hoursStr)
if !ok {
return 0, "", fmt.Errorf("cannot parse hh from %q; expected format: 'hh:mm'", hoursStr)
}
minutes, ok := tryParseUint64(minutesStr)
if !ok {
return 0, "", fmt.Errorf("cannot parse mm from %q; expected format: 'hh:mm'", minutesStr)
}
offset := int64(hours*nsecsPerHour + minutes*nsecsPerMinute)
if offset < 0 {
offset = 0
}
if offset >= nsecPerDay {
offset = nsecPerDay - 1
}
return offset, argStr, nil
}
func getWeekRangeArg(lex *lexer) (time.Weekday, string, error) {
argStr, err := getCompoundToken(lex)
if err != nil {
return 0, "", err
}
var day time.Weekday
switch strings.ToLower(argStr) {
case "sun", "sunday":
day = time.Sunday
case "mon", "monday":
day = time.Monday
case "tue", "tuesday":
day = time.Tuesday
case "wed", "wednesday":
day = time.Wednesday
case "thu", "thursday":
day = time.Thursday
case "fri", "friday":
day = time.Friday
case "sat", "saturday":
day = time.Saturday
}
return day, argStr, nil
}
func parseFilterTimeRange(lex *lexer) (*filterTime, error) {
ft, err := parseFilterTime(lex) ft, err := parseFilterTime(lex)
if err != nil { if err != nil {
return nil, err return nil, err

View file

@ -33,6 +33,77 @@ func TestLexer(t *testing.T) {
[]string{"_stream", ":", "{", "foo", "=", "bar", ",", "a", "=~", "baz", ",", "b", "!=", "cd", ",", "d,}a", "!~", "abc", "}"}) []string{"_stream", ":", "{", "foo", "=", "bar", ",", "a", "=~", "baz", ",", "b", "!=", "cd", ",", "d,}a", "!~", "abc", "}"})
} }
func TestParseDayRange(t *testing.T) {
f := func(s string, startExpected, endExpected, offsetExpected int64) {
t.Helper()
q, err := ParseQuery("_time:day_range" + s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
fr, ok := q.f.(*filterDayRange)
if !ok {
t.Fatalf("unexpected filter; got %T; want *filterDayRange; filter: %s", q.f, q.f)
}
if fr.stringRepr != s {
t.Fatalf("unexpected string representation for filterDayRange; got %q; want %q", fr.stringRepr, s)
}
if fr.start != startExpected {
t.Fatalf("unexpected start; got %d; want %d", fr.start, startExpected)
}
if fr.end != endExpected {
t.Fatalf("unexpected end; got %d; want %d", fr.end, endExpected)
}
if fr.offset != offsetExpected {
t.Fatalf("unexpected offset; got %d; want %d", fr.offset, offsetExpected)
}
}
f("[00:00, 24:00]", 0, nsecsPerDay-1, 0)
f("[10:20, 125:00]", 10*nsecsPerHour+20*nsecsPerMinute, nsecsPerDay-1, 0)
f("(00:00, 24:00)", 1, nsecsPerDay-2, 0)
f("[08:00, 18:00)", 8*nsecsPerHour, 18*nsecsPerHour-1, 0)
f("[08:00, 18:00) offset 2h", 8*nsecsPerHour, 18*nsecsPerHour-1, 2*nsecsPerHour)
f("[08:00, 18:00) offset -2h", 8*nsecsPerHour, 18*nsecsPerHour-1, -2*nsecsPerHour)
}
func TestParseWeekRange(t *testing.T) {
f := func(s string, startDayExpected, endDayExpected time.Weekday, offsetExpected int64) {
t.Helper()
q, err := ParseQuery("_time:week_range" + s)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
fr, ok := q.f.(*filterWeekRange)
if !ok {
t.Fatalf("unexpected filter; got %T; want *filterWeekRange; filter: %s", q.f, q.f)
}
if fr.stringRepr != s {
t.Fatalf("unexpected string representation for filterWeekRange; got %q; want %q", fr.stringRepr, s)
}
if fr.startDay != startDayExpected {
t.Fatalf("unexpected start; got %s; want %s", fr.startDay, startDayExpected)
}
if fr.endDay != endDayExpected {
t.Fatalf("unexpected end; got %s; want %s", fr.endDay, endDayExpected)
}
if fr.offset != offsetExpected {
t.Fatalf("unexpected offset; got %d; want %d", fr.offset, offsetExpected)
}
}
f("[Sun, Sat]", time.Sunday, time.Saturday, 0)
f("(Sun, Sat]", time.Monday, time.Saturday, 0)
f("(Sun, Sat)", time.Monday, time.Friday, 0)
f("[Sun, Sat)", time.Sunday, time.Friday, 0)
f(`[Mon, Tue]`, time.Monday, time.Tuesday, 0)
f(`[Wed, Thu]`, time.Wednesday, time.Thursday, 0)
f(`[Fri, Sat]`, time.Friday, time.Saturday, 0)
f(`[Mon, Fri] offset 2h`, time.Monday, time.Friday, 2*nsecsPerHour)
f(`[Mon, Fri] offset -2h`, time.Monday, time.Friday, -2*nsecsPerHour)
}
func TestParseTimeDuration(t *testing.T) { func TestParseTimeDuration(t *testing.T) {
f := func(s string, durationExpected time.Duration) { f := func(s string, durationExpected time.Duration) {
t.Helper() t.Helper()
@ -666,6 +737,20 @@ func TestParseQuerySuccess(t *testing.T) {
f(`_time:1h (Offset)`, `_time:1h "Offset"`) // "offset" is a search word, since it is in parens f(`_time:1h (Offset)`, `_time:1h "Offset"`) // "offset" is a search word, since it is in parens
f(`_time:1h "and"`, `_time:1h "and"`) // "and" is a search word, since it is quoted f(`_time:1h "and"`, `_time:1h "and"`) // "and" is a search word, since it is quoted
// dayRange filters
f(`_time:day_range[08:00, 20:30)`, `_time:day_range[08:00, 20:30)`)
f(`_time:day_range(08:00, 20:30)`, `_time:day_range(08:00, 20:30)`)
f(`_time:day_range(08:00, 20:30]`, `_time:day_range(08:00, 20:30]`)
f(`_time:day_range[08:00, 20:30]`, `_time:day_range[08:00, 20:30]`)
f(`_time:day_range[08:00, 20:30] offset 2.5h`, `_time:day_range[08:00, 20:30] offset 2.5h`)
f(`_time:day_range[08:00, 20:30] offset -2.5h`, `_time:day_range[08:00, 20:30] offset -2.5h`)
// weekRange filters
f(`_time:week_range[Mon, Fri]`, `_time:week_range[Mon, Fri]`)
f(`_time:week_range(Monday, Friday] offset 2.5h`, `_time:week_range(Monday, Friday] offset 2.5h`)
f(`_time:week_range[monday, friday) offset -2.5h`, `_time:week_range[monday, friday) offset -2.5h`)
f(`_time:week_range(mon, fri]`, `_time:week_range(mon, fri]`)
// reserved keywords // reserved keywords
f("and", `"and"`) f("and", `"and"`)
f("and and or", `"and" "or"`) f("and and or", `"and" "or"`)
@ -1186,6 +1271,24 @@ func TestParseQueryFailure(t *testing.T) {
f("_time:5m offset") f("_time:5m offset")
f("_time:10m offset foobar") f("_time:10m offset foobar")
// invalid day_range filters
f("_time:day_range")
f("_time:day_range[")
f("_time:day_range[foo")
f("_time:day_range[00:00,")
f("_time:day_range[00:00,bar")
f("_time:day_range[00:00,08:00")
f("_time:day_range[00:00,08:00] offset")
// invalid week_range filters
f("_time:week_range")
f("_time:week_range[")
f("_time:week_range[foo")
f("_time:week_range[Mon,")
f("_time:week_range[Mon,bar")
f("_time:week_range[Mon,Fri")
f("_time:week_range[Mon,Fri] offset")
// long query with error // long query with error
f(`very long query with error aaa ffdfd fdfdfd fdfd:( ffdfdfdfdfd`) f(`very long query with error aaa ffdfd fdfdfd fdfd:( ffdfdfdfdfd`)
@ -1877,6 +1980,7 @@ func TestQueryGetFilterTimeRange(t *testing.T) {
f("*", -9223372036854775808, 9223372036854775807) f("*", -9223372036854775808, 9223372036854775807)
f("_time:2024-05-31T10:20:30.456789123Z", 1717150830456789123, 1717150830456789123) f("_time:2024-05-31T10:20:30.456789123Z", 1717150830456789123, 1717150830456789123)
f("_time:2024-05-31", 1717113600000000000, 1717199999999999999) f("_time:2024-05-31", 1717113600000000000, 1717199999999999999)
f("_time:2024-05-31 _time:day_range[08:00, 16:00]", 1717113600000000000, 1717199999999999999)
} }
func TestQueryCanReturnLastNResults(t *testing.T) { func TestQueryCanReturnLastNResults(t *testing.T) {