lib/logstorage: work-in-progress

This commit is contained in:
Aliaksandr Valialkin 2024-06-20 03:08:07 +02:00
parent 201fd6de1e
commit 7229dd8c33
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
55 changed files with 931 additions and 108 deletions

View file

@ -1,13 +1,13 @@
{
"files": {
"main.css": "./static/css/main.2fa7c03f.css",
"main.js": "./static/js/main.68f1bd69.js",
"main.css": "./static/css/main.1041c3d4.css",
"main.js": "./static/js/main.e54f9531.js",
"static/js/685.bebe1265.chunk.js": "./static/js/685.bebe1265.chunk.js",
"static/media/MetricsQL.md": "./static/media/MetricsQL.cb83d071da309a358bc0.md",
"index.html": "./index.html"
},
"entrypoints": [
"static/css/main.2fa7c03f.css",
"static/js/main.68f1bd69.js"
"static/css/main.1041c3d4.css",
"static/js/main.e54f9531.js"
]
}

View file

@ -1 +1 @@
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.68f1bd69.js"></script><link href="./static/css/main.2fa7c03f.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>
<!doctype html><html lang="en"><head><meta charset="utf-8"/><link rel="icon" href="./favicon.ico"/><meta name="viewport" content="width=device-width,initial-scale=1,maximum-scale=5"/><meta name="theme-color" content="#000000"/><meta name="description" content="UI for VictoriaMetrics"/><link rel="apple-touch-icon" href="./apple-touch-icon.png"/><link rel="icon" type="image/png" sizes="32x32" href="./favicon-32x32.png"><link rel="manifest" href="./manifest.json"/><title>VM UI</title><script src="./dashboards/index.js" type="module"></script><meta name="twitter:card" content="summary_large_image"><meta name="twitter:image" content="./preview.jpg"><meta name="twitter:title" content="UI for VictoriaMetrics"><meta name="twitter:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta name="twitter:site" content="@VictoriaMetrics"><meta property="og:title" content="Metric explorer for VictoriaMetrics"><meta property="og:description" content="Explore and troubleshoot your VictoriaMetrics data"><meta property="og:image" content="./preview.jpg"><meta property="og:type" content="website"><script defer="defer" src="./static/js/main.e54f9531.js"></script><link href="./static/css/main.1041c3d4.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -42,7 +42,7 @@ services:
# storing logs and serving read queries.
victorialogs:
container_name: victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
command:
- "--storageDataPath=/vlogs"
- "--httpListenAddr=:9428"

View file

@ -22,7 +22,7 @@ services:
- -beat.uri=http://filebeat-victorialogs:5066
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-filebeat-docker-vl:/vlogs
ports:

View file

@ -13,7 +13,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-filebeat-syslog-vl:/vlogs
ports:

View file

@ -11,7 +11,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-fluentbit-vl:/vlogs
ports:

View file

@ -14,7 +14,7 @@ services:
- "5140:5140"
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-logstash-vl:/vlogs
ports:

View file

@ -12,7 +12,7 @@ services:
- "5140:5140"
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-promtail-docker:/vlogs
ports:

View file

@ -22,7 +22,7 @@ services:
condition: service_healthy
victorialogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- victorialogs-vector-docker-vl:/vlogs
ports:

View file

@ -3,7 +3,7 @@ version: '3'
services:
# Run `make package-victoria-logs` to build victoria-logs image
vlogs:
image: docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
image: docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
volumes:
- vlogs:/vlogs
ports:

View file

@ -19,7 +19,12 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
## [v0.21.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.21.0-victorialogs)
Released at 2024-06-20
* FEATURE: [web UI](https://docs.victoriametrics.com/victorialogs/querying/#web-ui): add a bar chart displaying the number of log entries over a time range. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6404).
* FEATURE: expose `_stream_id` field, which uniquely identifies [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields). This field can be used for quick obtaining of all the logs belonging to a particular stream via [`_stream_id` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter).
## [v0.20.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.20.2-victorialogs)

View file

@ -427,7 +427,7 @@ See also:
### Stream filter
VictoriaLogs provides an optimized way to select log entries, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
VictoriaLogs provides an optimized way to select logs, which belong to particular [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
This can be done via `_stream:{...}` filter. The `{...}` may contain arbitrary
[Prometheus-compatible label selector](https://docs.victoriametrics.com/keyconcepts/#filtering)
over fields associated with [log streams](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields).
@ -456,9 +456,34 @@ Performance tips:
See also:
- [`_stream_id` filter](#_stream_id-filter)
- [Time filter](#time-filter)
- [Exact filter](#exact-filter)
### _stream_id filter
Every [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields) in VictoriaMetrics is uniquely identified by `_stream_id` field.
The `_stream_id:...` filter allows quickly selecting all the logs belonging to the particular stream.
For example, the following query selects all the logs, which belong to the [log stream](https://docs.victoriametrics.com/victorialogs/keyconcepts/#stream-fields)
with `_stream_id` equal to `0000007b000001c850d9950ea6196b1a4812081265faa1c7`:
```logsql
_stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
```
If the log stream contains too many logs, then it is good idea limiting the number of returned logs with [time filter](#time-filter). For example, the following
query selects logs for the given stream for the last hour:
```logsql
_time:1h _stream_id:0000007b000001c850d9950ea6196b1a4812081265faa1c7
```
See also:
- [stream filter](#stream-filter)
### Word filter
The simplest LogsQL query consists of a single [word](#word) to search in log messages. For example, the following query matches

View file

@ -36,8 +36,8 @@ Just download archive for the needed Operating system and architecture, unpack i
For example, the following commands download VictoriaLogs archive for Linux/amd64, unpack and run it:
```sh
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.20.2-victorialogs/victoria-logs-linux-amd64-v0.20.2-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.20.2-victorialogs.tar.gz
curl -L -O https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v0.21.0-victorialogs/victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
tar xzf victoria-logs-linux-amd64-v0.21.0-victorialogs.tar.gz
./victoria-logs-prod
```
@ -61,7 +61,7 @@ Here is the command to run VictoriaLogs in a Docker container:
```sh
docker run --rm -it -p 9428:9428 -v ./victoria-logs-data:/victoria-logs-data \
docker.io/victoriametrics/victoria-logs:v0.20.2-victorialogs
docker.io/victoriametrics/victoria-logs:v0.21.0-victorialogs
```
See also:

View file

@ -161,7 +161,7 @@ the search to a particular time range.
### Stream fields
Some [structured logging](#data-model) fields may uniquely identify the application instance, which generates log entries.
Some [structured logging](#data-model) fields may uniquely identify the application instance, which generates logs.
This may be either a single field such as `instance="host123:456"` or a set of fields such as
`{datacenter="...", env="...", job="...", instance="..."}` or
`{kubernetes.namespace="...", kubernetes.node.name="...", kubernetes.pod.name="...", kubernetes.container.name="..."}`.
@ -176,16 +176,18 @@ This provides the following benefits:
- Increased query performance, since VictoriaLogs needs to scan lower amounts of data
when [searching by stream fields](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
Every ingested log entry is associated with a log stream. The name of this stream is stored in `_stream` field.
This field has the format similar to [labels in Prometheus metrics](https://docs.victoriametrics.com/keyconcepts/#labels):
Every ingested log entry is associated with a log stream. Every log stream consists of two fields:
```
{field1="value1", ..., fieldN="valueN"}
```
- `_stream_id` - this is an unique identifier for the log stream. All the logs for the particular stream can be selected
via [`_stream_id:...` filter](https://docs.victoriametrics.com/victorialogs/logsql/#_stream_id-filter).
For example, if `host` and `app` fields are associated with the stream, then the `_stream` field will have `{host="host-123",app="my-app"}` value
for the log entry with `host="host-123"` and `app="my-app"` fields. The `_stream` field can be searched
with [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
- `_stream` - this field contains stream labels in the format similar to [labels in Prometheus metrics](https://docs.victoriametrics.com/keyconcepts/#labels):
```
{field1="value1", ..., fieldN="valueN"}
```
For example, if `host` and `app` fields are associated with the stream, then the `_stream` field will have `{host="host-123",app="my-app"}` value
for the log entry with `host="host-123"` and `app="my-app"` fields. The `_stream` field can be searched
with [stream filters](https://docs.victoriametrics.com/victorialogs/logsql/#stream-filter).
By default the value of `_stream` field is `{}`, since VictoriaLogs cannot determine automatically,
which fields uniquely identify every log stream. This may lead to not-so-optimal resource usage and query performance.

View file

@ -263,10 +263,15 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
br.addTimeColumn()
}
if !slices.Contains(unneededColumnNames, "_stream_id") {
// Add _stream_id column
br.addStreamIDColumn(bs)
}
if !slices.Contains(unneededColumnNames, "_stream") {
// Add _stream column
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
// Skip the current block, since the associated stream tags are missing
br.reset()
return
}
@ -315,6 +320,8 @@ func (br *blockResult) initAllColumns(bs *blockSearch, bm *bitmap) {
func (br *blockResult) initRequestedColumns(bs *blockSearch, bm *bitmap) {
for _, columnName := range bs.bsw.so.neededColumnNames {
switch columnName {
case "_stream_id":
br.addStreamIDColumn(bs)
case "_stream":
if !br.addStreamColumn(bs) {
// Skip the current block, since the associated stream tags are missing.
@ -485,6 +492,13 @@ func (br *blockResult) addTimeColumn() {
br.csInitialized = false
}
func (br *blockResult) addStreamIDColumn(bs *blockSearch) {
bb := bbPool.Get()
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
br.addConstColumn("_stream_id", bytesutil.ToUnsafeString(bb.B))
bbPool.Put(bb)
}
func (br *blockResult) addStreamColumn(bs *blockSearch) bool {
if !bs.prevStreamID.equal(&bs.bsw.bh.streamID) {
return br.addStreamColumnSlow(bs)

View file

@ -5,6 +5,8 @@ import (
)
func TestFilterAnd(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",

View file

@ -21,8 +21,14 @@ type filterAnyCasePhrase struct {
phraseLowercaseOnce sync.Once
phraseLowercase string
phraseUppercaseOnce sync.Once
phraseUppercase string
tokensOnce sync.Once
tokens []string
tokensUppercaseOnce sync.Once
tokensUppercase []string
}
func (fp *filterAnyCasePhrase) String() string {
@ -42,6 +48,20 @@ func (fp *filterAnyCasePhrase) initTokens() {
fp.tokens = tokenizeStrings(nil, []string{fp.phrase})
}
func (fp *filterAnyCasePhrase) getTokensUppercase() []string {
fp.tokensUppercaseOnce.Do(fp.initTokensUppercase)
return fp.tokensUppercase
}
func (fp *filterAnyCasePhrase) initTokensUppercase() {
tokens := fp.getTokens()
tokensUppercase := make([]string, len(tokens))
for i, token := range tokens {
tokensUppercase[i] = strings.ToUpper(token)
}
fp.tokensUppercase = tokensUppercase
}
func (fp *filterAnyCasePhrase) getPhraseLowercase() string {
fp.phraseLowercaseOnce.Do(fp.initPhraseLowercase)
return fp.phraseLowercase
@ -51,6 +71,15 @@ func (fp *filterAnyCasePhrase) initPhraseLowercase() {
fp.phraseLowercase = strings.ToLower(fp.phrase)
}
func (fp *filterAnyCasePhrase) getPhraseUppercase() string {
fp.phraseUppercaseOnce.Do(fp.initPhraseUppercase)
return fp.phraseUppercase
}
func (fp *filterAnyCasePhrase) initPhraseUppercase() {
fp.phraseUppercase = strings.ToUpper(fp.phrase)
}
func (fp *filterAnyCasePhrase) applyToBlockResult(br *blockResult, bm *bitmap) {
phraseLowercase := fp.getPhraseLowercase()
applyToBlockResultGeneric(br, bm, fp.fieldName, phraseLowercase, matchAnyCasePhrase)
@ -100,8 +129,9 @@ func (fp *filterAnyCasePhrase) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
case valueTypeIPv4:
matchIPv4ByPhrase(bs, ch, bm, phraseLowercase, tokens)
case valueTypeTimestampISO8601:
phraseUppercase := strings.ToUpper(fp.phrase)
matchTimestampISO8601ByPhrase(bs, ch, bm, phraseUppercase, tokens)
phraseUppercase := fp.getPhraseUppercase()
tokensUppercase := fp.getTokensUppercase()
matchTimestampISO8601ByPhrase(bs, ch, bm, phraseUppercase, tokensUppercase)
default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
}

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchAnyCasePhrase(t *testing.T) {
t.Parallel()
f := func(s, phraseLowercase string, resultExpected bool) {
t.Helper()
result := matchAnyCasePhrase(s, phraseLowercase)
@ -39,7 +41,11 @@ func TestMatchAnyCasePhrase(t *testing.T) {
}
func TestFilterAnyCasePhrase(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -113,6 +119,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "other-column",
@ -222,6 +230,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -277,6 +287,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -347,6 +359,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -412,6 +426,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -476,6 +492,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -540,6 +558,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -603,6 +623,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -708,6 +730,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -804,6 +828,8 @@ func TestFilterAnyCasePhrase(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -22,8 +22,14 @@ type filterAnyCasePrefix struct {
prefixLowercaseOnce sync.Once
prefixLowercase string
prefixUppercaseOnce sync.Once
prefixUppercase string
tokensOnce sync.Once
tokens []string
tokensUppercaseOnce sync.Once
tokensUppercase []string
}
func (fp *filterAnyCasePrefix) String() string {
@ -46,6 +52,20 @@ func (fp *filterAnyCasePrefix) initTokens() {
fp.tokens = getTokensSkipLast(fp.prefix)
}
func (fp *filterAnyCasePrefix) getTokensUppercase() []string {
fp.tokensUppercaseOnce.Do(fp.initTokensUppercase)
return fp.tokensUppercase
}
func (fp *filterAnyCasePrefix) initTokensUppercase() {
tokens := fp.getTokens()
tokensUppercase := make([]string, len(tokens))
for i, token := range tokens {
tokensUppercase[i] = strings.ToUpper(token)
}
fp.tokensUppercase = tokensUppercase
}
func (fp *filterAnyCasePrefix) getPrefixLowercase() string {
fp.prefixLowercaseOnce.Do(fp.initPrefixLowercase)
return fp.prefixLowercase
@ -55,6 +75,15 @@ func (fp *filterAnyCasePrefix) initPrefixLowercase() {
fp.prefixLowercase = strings.ToLower(fp.prefix)
}
func (fp *filterAnyCasePrefix) getPrefixUppercase() string {
fp.prefixUppercaseOnce.Do(fp.initPrefixUppercase)
return fp.prefixUppercase
}
func (fp *filterAnyCasePrefix) initPrefixUppercase() {
fp.prefixUppercase = strings.ToUpper(fp.prefix)
}
func (fp *filterAnyCasePrefix) applyToBlockResult(br *blockResult, bm *bitmap) {
prefixLowercase := fp.getPrefixLowercase()
applyToBlockResultGeneric(br, bm, fp.fieldName, prefixLowercase, matchAnyCasePrefix)
@ -101,8 +130,9 @@ func (fp *filterAnyCasePrefix) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
case valueTypeIPv4:
matchIPv4ByPrefix(bs, ch, bm, prefixLowercase, tokens)
case valueTypeTimestampISO8601:
prefixUppercase := strings.ToUpper(fp.prefix)
matchTimestampISO8601ByPrefix(bs, ch, bm, prefixUppercase, tokens)
prefixUppercase := fp.getPrefixUppercase()
tokensUppercase := fp.getTokensUppercase()
matchTimestampISO8601ByPrefix(bs, ch, bm, prefixUppercase, tokensUppercase)
default:
logger.Panicf("FATAL: %s: unknown valueType=%d", bs.partPath(), ch.valueType)
}

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchAnyCasePrefix(t *testing.T) {
t.Parallel()
f := func(s, prefixLowercase string, resultExpected bool) {
t.Helper()
result := matchAnyCasePrefix(s, prefixLowercase)
@ -39,7 +41,11 @@ func TestMatchAnyCasePrefix(t *testing.T) {
}
func TestFilterAnyCasePrefix(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -131,6 +137,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "other-column",
@ -246,6 +254,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -301,6 +311,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -383,6 +395,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -448,6 +462,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -512,6 +528,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -576,6 +594,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -639,6 +659,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -750,6 +772,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -846,6 +870,8 @@ func TestFilterAnyCasePrefix(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -115,7 +115,7 @@ func (fr *filterDayRange) matchTimestampValue(timestamp int64) bool {
}
func (fr *filterDayRange) dayRangeOffset(timestamp int64) int64 {
timestamp += fr.offset
timestamp -= fr.offset
return timestamp % nsecsPerDay
}

View file

@ -5,6 +5,8 @@ import (
)
func TestFilterDayRange(t *testing.T) {
t.Parallel()
timestamps := []int64{
1,
9,
@ -35,7 +37,7 @@ func TestFilterDayRange(t *testing.T) {
ft = &filterDayRange{
start: 1,
end: 1,
offset: 9,
offset: 8,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
@ -44,7 +46,7 @@ func TestFilterDayRange(t *testing.T) {
end: 10,
offset: -9,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
testFilterMatchForTimestamps(t, timestamps, ft, []int{0})
ft = &filterDayRange{
start: 2,

View file

@ -5,7 +5,11 @@ import (
)
func TestFilterExactPrefix(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -55,6 +59,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -106,6 +112,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -149,6 +157,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -201,6 +211,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -254,6 +266,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -307,6 +321,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -360,6 +376,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -384,7 +402,7 @@ func TestFilterExactPrefix(t *testing.T) {
fieldName: "foo",
prefix: "12",
}
testFilterMatchForColumns(t, columns, fep, "foo", []int{0, 1, 5})
testFilterMatchForColumns(t, columns, fep, "foo", []int{0, 1, 5, 9})
fep = &filterExactPrefix{
fieldName: "foo",
@ -413,6 +431,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -470,6 +490,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -495,7 +517,7 @@ func TestFilterExactPrefix(t *testing.T) {
fieldName: "foo",
prefix: "127.0.",
}
testFilterMatchForColumns(t, columns, fep, "foo", []int{2, 4, 5, 7})
testFilterMatchForColumns(t, columns, fep, "foo", []int{2, 4, 5, 6, 7})
fep = &filterExactPrefix{
fieldName: "foo",
@ -518,6 +540,8 @@ func TestFilterExactPrefix(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,7 +5,11 @@ import (
)
func TestFilterExact(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -43,6 +47,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -88,6 +94,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -131,6 +139,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -183,6 +193,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -236,6 +248,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -289,6 +303,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -342,6 +358,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -395,6 +413,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -470,6 +490,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -530,6 +552,8 @@ func TestFilterExact(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -7,7 +7,11 @@ import (
)
func TestFilterIn(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -75,6 +79,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -126,6 +132,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -181,6 +189,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -233,6 +243,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -298,6 +310,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -363,6 +377,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -428,6 +444,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -487,6 +505,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -568,6 +588,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -634,6 +656,8 @@ func TestFilterIn(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchIPv4Range(t *testing.T) {
t.Parallel()
f := func(s string, minValue, maxValue uint32, resultExpected bool) {
t.Helper()
result := matchIPv4Range(s, minValue, maxValue)
@ -28,7 +30,11 @@ func TestMatchIPv4Range(t *testing.T) {
}
func TestFilterIPv4Range(t *testing.T) {
t.Parallel()
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -79,6 +85,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -134,6 +142,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -177,6 +187,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -206,6 +218,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -235,6 +249,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -264,6 +280,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -293,6 +311,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -322,6 +342,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -374,6 +396,8 @@ func TestFilterIPv4Range(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchLenRange(t *testing.T) {
t.Parallel()
f := func(s string, minLen, maxLen uint64, resultExpected bool) {
t.Helper()
result := matchLenRange(s, minLen, maxLen)
@ -31,7 +33,11 @@ func TestMatchLenRange(t *testing.T) {
}
func TestFilterLenRange(t *testing.T) {
t.Parallel()
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -75,6 +81,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -115,6 +123,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -151,6 +161,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -176,7 +188,7 @@ func TestFilterLenRange(t *testing.T) {
minLen: 2,
maxLen: 2,
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
// mismatch
fr = &filterLenRange{
@ -195,6 +207,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -220,7 +234,7 @@ func TestFilterLenRange(t *testing.T) {
minLen: 2,
maxLen: 2,
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
// mismatch
fr = &filterLenRange{
@ -239,6 +253,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -264,7 +280,7 @@ func TestFilterLenRange(t *testing.T) {
minLen: 2,
maxLen: 2,
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
// mismatch
fr = &filterLenRange{
@ -283,6 +299,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -308,7 +326,7 @@ func TestFilterLenRange(t *testing.T) {
minLen: 2,
maxLen: 2,
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{2, 3, 6})
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 2, 5})
// mismatch
fr = &filterLenRange{
@ -327,6 +345,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -364,6 +384,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -402,6 +424,8 @@ func TestFilterLenRange(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,6 +5,8 @@ import (
)
func TestFilterNot(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",

View file

@ -5,6 +5,8 @@ import (
)
func TestFilterOr(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchPhrase(t *testing.T) {
t.Parallel()
f := func(s, phrase string, resultExpected bool) {
t.Helper()
result := matchPhrase(s, phrase)
@ -44,7 +46,11 @@ func TestMatchPhrase(t *testing.T) {
}
func TestFilterPhrase(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -118,6 +124,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "other-column",
@ -227,6 +235,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -282,6 +292,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -352,6 +364,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -417,6 +431,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -481,6 +497,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -545,6 +563,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -608,6 +628,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -713,6 +735,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -809,6 +833,8 @@ func TestFilterPhrase(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchPrefix(t *testing.T) {
t.Parallel()
f := func(s, prefix string, resultExpected bool) {
t.Helper()
result := matchPrefix(s, prefix)
@ -44,7 +46,11 @@ func TestMatchPrefix(t *testing.T) {
}
func TestFilterPrefix(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -130,6 +136,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "other-column",
@ -245,6 +253,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -300,6 +310,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -382,6 +394,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -447,6 +461,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -511,6 +527,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -575,6 +593,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -638,6 +658,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -749,6 +771,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -845,6 +869,8 @@ func TestFilterPrefix(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -5,7 +5,11 @@ import (
)
func TestFilterRange(t *testing.T) {
t.Parallel()
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -77,6 +81,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -146,6 +152,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -210,6 +218,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -276,6 +286,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -341,6 +353,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -406,6 +420,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -478,6 +494,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -550,6 +568,8 @@ func TestFilterRange(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -570,16 +590,17 @@ func TestFilterRange(t *testing.T) {
},
}
// range filter always mismatches ipv4
fr := &filterRange{
fieldName: "foo",
minValue: -100,
maxValue: 100,
}
testFilterMatchForColumns(t, columns, fr, "foo", nil)
testFilterMatchForColumns(t, columns, fr, "foo", []int{1})
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -8,7 +8,11 @@ import (
)
func TestFilterRegexp(t *testing.T) {
t.Parallel()
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -54,6 +58,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -92,6 +98,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -126,6 +134,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -161,6 +171,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -196,6 +208,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -231,6 +245,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -266,6 +282,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -301,6 +319,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -337,6 +357,8 @@ func TestFilterRegexp(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",
@ -371,6 +393,8 @@ func TestFilterRegexp(t *testing.T) {
}
func TestSkipFirstLastToken(t *testing.T) {
t.Parallel()
f := func(s, resultExpected string) {
t.Helper()

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchSequence(t *testing.T) {
t.Parallel()
f := func(s string, phrases []string, resultExpected bool) {
t.Helper()
result := matchSequence(s, phrases)
@ -28,7 +30,11 @@ func TestMatchSequence(t *testing.T) {
}
func TestFilterSequence(t *testing.T) {
t.Parallel()
t.Run("single-row", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -102,6 +108,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -153,6 +161,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -208,6 +218,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -278,6 +290,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -349,6 +363,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -420,6 +436,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -491,6 +509,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -562,6 +582,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -643,6 +665,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -739,6 +763,8 @@ func TestFilterSequence(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -0,0 +1,84 @@
package logstorage
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
// filterStreamID is the filter for `_stream_id:id`
type filterStreamID struct {
streamIDStr string
}
func (fs *filterStreamID) String() string {
return "_stream_id:" + quoteTokenIfNeeded(fs.streamIDStr)
}
func (fs *filterStreamID) updateNeededFields(neededFields fieldsSet) {
neededFields.add("_stream_id")
}
func (fs *filterStreamID) applyToBlockResult(br *blockResult, bm *bitmap) {
c := br.getColumnByName("_stream_id")
if c.isConst {
v := c.valuesEncoded[0]
if fs.streamIDStr != v {
bm.resetBits()
}
return
}
if c.isTime {
bm.resetBits()
return
}
switch c.valueType {
case valueTypeString:
values := c.getValues(br)
bm.forEachSetBit(func(idx int) bool {
v := values[idx]
return fs.streamIDStr == v
})
case valueTypeDict:
bb := bbPool.Get()
for _, v := range c.dictValues {
c := byte(0)
if fs.streamIDStr == v {
c = 1
}
bb.B = append(bb.B, c)
}
valuesEncoded := c.getValuesEncoded(br)
bm.forEachSetBit(func(idx int) bool {
n := valuesEncoded[idx][0]
return bb.B[n] == 1
})
bbPool.Put(bb)
case valueTypeUint8:
bm.resetBits()
case valueTypeUint16:
bm.resetBits()
case valueTypeUint32:
bm.resetBits()
case valueTypeUint64:
bm.resetBits()
case valueTypeFloat64:
bm.resetBits()
case valueTypeIPv4:
bm.resetBits()
case valueTypeTimestampISO8601:
bm.resetBits()
default:
logger.Panicf("FATAL: unknown valueType=%d", c.valueType)
}
}
func (fs *filterStreamID) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
bb := bbPool.Get()
bb.B = bs.bsw.bh.streamID.marshalString(bb.B)
ok := fs.streamIDStr == string(bb.B)
bbPool.Put(bb)
if !ok {
bm.resetBits()
return
}
}

View file

@ -0,0 +1,87 @@
package logstorage
import (
"fmt"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestFilterStreamID(t *testing.T) {
t.Parallel()
// match
ft := &filterStreamID{
streamIDStr: "0000007b000001c8302bc96e02e54e5524b3a68ec271e55e",
}
testFilterMatchForStreamID(t, ft, []int{0, 3, 6, 9})
ft = &filterStreamID{
streamIDStr: "0000007b000001c850d9950ea6196b1a4812081265faa1c7",
}
testFilterMatchForStreamID(t, ft, []int{1, 4, 7})
// mismatch
ft = &filterStreamID{
streamIDStr: "abc",
}
testFilterMatchForStreamID(t, ft, nil)
}
func testFilterMatchForStreamID(t *testing.T, f filter, expectedRowIdxs []int) {
t.Helper()
storagePath := t.Name()
cfg := &StorageConfig{
Retention: 100 * 365 * time.Duration(nsecsPerDay),
}
s := MustOpenStorage(storagePath, cfg)
tenantID := TenantID{
AccountID: 123,
ProjectID: 456,
}
getMsgValue := func(i int) string {
return fmt.Sprintf("some message value %d", i)
}
generateTestLogStreams(s, tenantID, getMsgValue, 10, 3)
expectedResults := make([]string, len(expectedRowIdxs))
expectedTimestamps := make([]int64, len(expectedRowIdxs))
for i, idx := range expectedRowIdxs {
expectedResults[i] = getMsgValue(idx)
expectedTimestamps[i] = int64(idx * 100)
}
testFilterMatchForStorage(t, s, tenantID, f, "_msg", expectedResults, expectedTimestamps)
// Close and delete the test storage
s.MustClose()
fs.MustRemoveAll(storagePath)
}
func generateTestLogStreams(s *Storage, tenantID TenantID, getMsgValue func(int) string, rowsCount, streamsCount int) {
streamFields := []string{"host", "app"}
lr := GetLogRows(streamFields, nil)
var fields []Field
for i := range rowsCount {
fields = append(fields[:0], Field{
Name: "_msg",
Value: getMsgValue(i),
}, Field{
Name: "host",
Value: fmt.Sprintf("host-%d", i%streamsCount),
}, Field{
Name: "app",
Value: "foobar",
})
timestamp := int64(i * 100)
lr.MustAdd(tenantID, timestamp, fields)
}
s.MustAddRows(lr)
PutLogRows(lr)
}

View file

@ -5,6 +5,8 @@ import (
)
func TestMatchStringRange(t *testing.T) {
t.Parallel()
f := func(s, minValue, maxValue string, resultExpected bool) {
t.Helper()
result := matchStringRange(s, minValue, maxValue)
@ -22,7 +24,11 @@ func TestMatchStringRange(t *testing.T) {
}
func TestFilterStringRange(t *testing.T) {
t.Parallel()
t.Run("const-column", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -45,11 +51,18 @@ func TestFilterStringRange(t *testing.T) {
fr = &filterStringRange{
fieldName: "foo",
minValue: "127.0.0.1",
maxValue: "127.0.0.1",
maxValue: "127.0.0.2",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{0, 1, 2})
// mismatch
fr = &filterStringRange{
fieldName: "foo",
minValue: "127.0.0.1",
maxValue: "127.0.0.1",
}
testFilterMatchForColumns(t, columns, fr, "foo", nil)
fr = &filterStringRange{
fieldName: "foo",
minValue: "",
@ -73,6 +86,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("dict", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -100,9 +115,9 @@ func TestFilterStringRange(t *testing.T) {
fr = &filterStringRange{
fieldName: "foo",
minValue: "127",
maxValue: "127.0.0.1",
maxValue: "127.0.0.2",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 7})
testFilterMatchForColumns(t, columns, fr, "foo", []int{1, 6, 7})
// mismatch
fr = &filterStringRange{
@ -128,6 +143,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("strings", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -135,8 +152,8 @@ func TestFilterStringRange(t *testing.T) {
"A FOO",
"a 10",
"127.0.0.1",
"20",
"15.5",
"200",
"155.5",
"-5",
"a fooBaR",
"a 127.0.0.1 dfff",
@ -171,6 +188,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("uint8", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -194,9 +213,9 @@ func TestFilterStringRange(t *testing.T) {
fr := &filterStringRange{
fieldName: "foo",
minValue: "33",
maxValue: "5",
maxValue: "500",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
// mismatch
fr = &filterStringRange{
@ -222,6 +241,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("uint16", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -245,9 +266,9 @@ func TestFilterStringRange(t *testing.T) {
fr := &filterStringRange{
fieldName: "foo",
minValue: "33",
maxValue: "5",
maxValue: "555",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
// mismatch
fr = &filterStringRange{
@ -273,6 +294,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("uint32", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -296,9 +319,9 @@ func TestFilterStringRange(t *testing.T) {
fr := &filterStringRange{
fieldName: "foo",
minValue: "33",
maxValue: "5",
maxValue: "555",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
// mismatch
fr = &filterStringRange{
@ -324,6 +347,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("uint64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -347,9 +372,9 @@ func TestFilterStringRange(t *testing.T) {
fr := &filterStringRange{
fieldName: "foo",
minValue: "33",
maxValue: "5",
maxValue: "5555",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
// mismatch
fr = &filterStringRange{
@ -375,6 +400,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("float64", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -398,15 +425,9 @@ func TestFilterStringRange(t *testing.T) {
fr := &filterStringRange{
fieldName: "foo",
minValue: "33",
maxValue: "5",
maxValue: "555",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{9, 10})
fr = &filterStringRange{
fieldName: "foo",
minValue: "-0",
maxValue: "-1",
}
testFilterMatchForColumns(t, columns, fr, "foo", []int{6})
testFilterMatchForColumns(t, columns, fr, "foo", []int{0})
// mismatch
fr = &filterStringRange{
@ -432,6 +453,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("ipv4", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "foo",
@ -491,6 +514,8 @@ func TestFilterStringRange(t *testing.T) {
})
t.Run("timestamp-iso8601", func(t *testing.T) {
t.Parallel()
columns := []column{
{
name: "_msg",

View file

@ -2,7 +2,11 @@ package logstorage
import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
@ -155,8 +159,10 @@ func testFilterMatchForColumns(t *testing.T, columns []column, f filter, neededC
t.Helper()
// Create the test storage
const storagePath = "testFilterMatchForColumns"
cfg := &StorageConfig{}
storagePath := t.Name()
cfg := &StorageConfig{
Retention: time.Duration(100 * 365 * nsecsPerDay),
}
s := MustOpenStorage(storagePath, cfg)
// Generate rows
@ -187,34 +193,58 @@ func testFilterMatchForColumns(t *testing.T, columns []column, f filter, neededC
fs.MustRemoveAll(storagePath)
}
func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f filter, neededColumnName string, expectedResults []string, expectedTimestamps []int64) {
func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f filter, neededColumnName string, expectedValues []string, expectedTimestamps []int64) {
t.Helper()
so := &genericSearchOptions{
tenantIDs: []TenantID{tenantID},
filter: f,
neededColumnNames: []string{neededColumnName},
neededColumnNames: []string{neededColumnName, "_time"},
}
workersCount := 3
type result struct {
value string
timestamp int64
}
var resultsMu sync.Mutex
var results []result
const workersCount = 3
s.search(workersCount, so, nil, func(_ uint, br *blockResult) {
// Verify columns
cs := br.getColumns()
if len(cs) != 1 {
t.Fatalf("unexpected number of columns in blockResult; got %d; want 1", len(cs))
if len(cs) != 2 {
t.Fatalf("unexpected number of columns in blockResult; got %d; want 2", len(cs))
}
results := cs[0].getValues(br)
if !reflect.DeepEqual(results, expectedResults) {
t.Fatalf("unexpected results matched;\ngot\n%q\nwant\n%q", results, expectedResults)
}
// Verify timestamps
if br.timestamps == nil {
br.timestamps = []int64{}
}
if !reflect.DeepEqual(br.timestamps, expectedTimestamps) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps)
values := cs[0].getValues(br)
resultsMu.Lock()
for i, v := range values {
results = append(results, result{
value: strings.Clone(v),
timestamp: br.timestamps[i],
})
}
resultsMu.Unlock()
})
sort.Slice(results, func(i, j int) bool {
return results[i].timestamp < results[j].timestamp
})
timestamps := make([]int64, len(results))
values := make([]string, len(results))
for i, r := range results {
timestamps[i] = r.timestamp
values[i] = r.value
}
if !reflect.DeepEqual(timestamps, expectedTimestamps) {
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", timestamps, expectedTimestamps)
}
if !reflect.DeepEqual(values, expectedValues) {
t.Fatalf("unexpected values;\ngot\n%q\nwant\n%q", values, expectedValues)
}
}
func generateRowsFromColumns(s *Storage, tenantID TenantID, columns []column) {

View file

@ -3,11 +3,14 @@ package logstorage
import (
"fmt"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestFilterTime(t *testing.T) {
t.Parallel()
timestamps := []int64{
1,
9,
@ -89,8 +92,10 @@ func testFilterMatchForTimestamps(t *testing.T, timestamps []int64, f filter, ex
t.Helper()
// Create the test storage
const storagePath = "testFilterMatchForTimestamps"
cfg := &StorageConfig{}
storagePath := t.Name()
cfg := &StorageConfig{
Retention: 100 * 365 * time.Duration(nsecsPerDay),
}
s := MustOpenStorage(storagePath, cfg)
// Generate rows

View file

@ -117,12 +117,12 @@ func (fr *filterWeekRange) matchTimestampValue(timestamp int64) bool {
}
func (fr *filterWeekRange) weekday(timestamp int64) time.Weekday {
timestamp += fr.offset
timestamp -= fr.offset
return time.Unix(0, timestamp).UTC().Weekday()
}
func (fr *filterWeekRange) applyToBlockSearch(bs *blockSearch, bm *bitmap) {
if fr.startDay > fr.endDay || fr.startDay > time.Saturday || fr.endDay < time.Monday {
if fr.startDay > fr.endDay {
bm.resetBits()
return
}

View file

@ -6,12 +6,15 @@ import (
)
func TestFilterWeekRange(t *testing.T) {
t.Parallel()
sunday := time.Date(2024, 6, 9, 1, 0, 0, 0, time.UTC).UnixNano()
timestamps := []int64{
0,
1 * nsecsPerDay,
2 * nsecsPerDay,
4 * nsecsPerDay,
6 * nsecsPerDay,
sunday,
sunday + 1*nsecsPerDay,
sunday + 2*nsecsPerDay,
sunday + 4*nsecsPerDay,
sunday + 6*nsecsPerDay,
}
// match
@ -36,16 +39,16 @@ func TestFilterWeekRange(t *testing.T) {
ft = &filterWeekRange{
startDay: time.Monday,
endDay: time.Monday,
offset: 2 * nsecsPerDay,
offset: 3 * nsecsPerDay,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{2})
testFilterMatchForTimestamps(t, timestamps, ft, []int{3})
ft = &filterWeekRange{
startDay: time.Monday,
endDay: time.Monday,
offset: -2 * nsecsPerDay,
}
testFilterMatchForTimestamps(t, timestamps, ft, []int{1})
testFilterMatchForTimestamps(t, timestamps, ft, []int{4})
ft = &filterWeekRange{
startDay: time.Sunday,
@ -68,9 +71,9 @@ func TestFilterWeekRange(t *testing.T) {
testFilterMatchForTimestamps(t, timestamps, ft, nil)
ft = &filterWeekRange{
startDay: time.Saturday,
endDay: time.Saturday,
offset: -2 * nsecsPerHour,
startDay: time.Friday,
endDay: time.Friday,
offset: -1 * nsecsPerHour,
}
testFilterMatchForTimestamps(t, timestamps, ft, nil)
}

View file

@ -9,7 +9,9 @@ import (
)
func TestStorageSearchStreamIDs(t *testing.T) {
const path = "TestStorageSearchStreamIDs"
t.Parallel()
path := t.Name()
const partitionName = "foobar"
s := newTestStorage()
mustCreateIndexdb(path)

View file

@ -3,6 +3,7 @@ package logstorage
import (
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
@ -234,6 +235,51 @@ func (q *Query) String() string {
return s
}
func (q *Query) getSortedStreamIDs() []streamID {
switch t := q.f.(type) {
case *filterAnd:
for _, f := range t.filters {
streamIDs, ok := getSortedStreamIDsFromFilterOr(f)
if ok {
return streamIDs
}
}
return nil
default:
streamIDs, _ := getSortedStreamIDsFromFilterOr(q.f)
return streamIDs
}
}
func getSortedStreamIDsFromFilterOr(f filter) ([]streamID, bool) {
switch t := f.(type) {
case *filterOr:
var streamIDs []streamID
for _, f := range t.filters {
fs, ok := f.(*filterStreamID)
if !ok {
return nil, false
}
var sid streamID
if sid.tryUnmarshalFromString(fs.streamIDStr) {
streamIDs = append(streamIDs, sid)
}
}
sort.Slice(streamIDs, func(i, j int) bool {
return streamIDs[i].less(&streamIDs[j])
})
return streamIDs, len(streamIDs) > 0
case *filterStreamID:
var sid streamID
if !sid.tryUnmarshalFromString(t.streamIDStr) {
return nil, true
}
return []streamID{sid}, true
default:
return nil, false
}
}
// AddCountByTimePipe adds '| stats by (_time:step offset off, field1, ..., fieldN) count() hits' to the end of q.
func (q *Query) AddCountByTimePipe(step, off int64, fields []string) {
{
@ -773,6 +819,8 @@ func parseFilterForPhrase(lex *lexer, phrase, fieldName string) (filter, error)
switch fieldName {
case "_time":
return parseFilterTimeGeneric(lex)
case "_stream_id":
return parseFilterStreamID(lex)
case "_stream":
return parseFilterStream(lex)
default:
@ -1782,6 +1830,17 @@ func stripTimezoneSuffix(s string) string {
return s[:len(s)-len(tz)]
}
func parseFilterStreamID(lex *lexer) (*filterStreamID, error) {
s, err := getCompoundToken(lex)
if err != nil {
return nil, err
}
fs := &filterStreamID{
streamIDStr: s,
}
return fs, nil
}
func parseFilterStream(lex *lexer) (*filterStream, error) {
sf, err := parseStreamFilter(lex)
if err != nil {

View file

@ -696,6 +696,10 @@ func TestParseQuerySuccess(t *testing.T) {
// empty filter
f(`"" or foo:"" and not bar:""`, `"" or foo:"" !bar:""`)
// _stream_id filter
f(`_stream_id:foo`, `_stream_id:foo`)
f(`_stream_id:foo-bar/b:az`, `_stream_id:"foo-bar/b:az"`)
// _stream filters
f(`_stream:{}`, `_stream:{}`)
f(`_stream:{foo="bar", baz=~"x" OR or!="b", "x=},"="d}{"}`, `_stream:{foo="bar",baz=~"x" or "or"!="b","x=},"="d}{"}`)
@ -1238,6 +1242,9 @@ func TestParseQueryFailure(t *testing.T) {
f(`'foo`)
f("`foo")
// invalid _stream_id filters
f("_stream_id:(foo)")
// invalid _stream filters
f("_stream:")
f("_stream:{")

View file

@ -10,7 +10,9 @@ import (
)
func TestPartitionLifecycle(t *testing.T) {
const path = "TestPartitionLifecycle"
t.Parallel()
path := t.Name()
var ddbStats DatadbStats
s := newTestStorage()
@ -50,7 +52,9 @@ func TestPartitionLifecycle(t *testing.T) {
}
func TestPartitionMustAddRowsSerial(t *testing.T) {
const path = "TestPartitionMustAddRowsSerial"
t.Parallel()
path := t.Name()
var ddbStats DatadbStats
s := newTestStorage()
@ -132,7 +136,9 @@ func TestPartitionMustAddRowsSerial(t *testing.T) {
}
func TestPartitionMustAddRowsConcurrent(t *testing.T) {
const path = "TestPartitionMustAddRowsConcurrent"
t.Parallel()
path := t.Name()
s := newTestStorage()
mustCreatePartition(path)

View file

@ -18,6 +18,10 @@ type genericSearchOptions struct {
// tenantIDs must contain the list of tenantIDs for the search.
tenantIDs []TenantID
// streamIDs is an optional sorted list of streamIDs for the search.
// If it is empty, then the search is performed by tenantIDs
streamIDs []streamID
// filter is the filter to use for the search
filter filter
@ -101,9 +105,11 @@ func (s *Storage) RunQuery(ctx context.Context, tenantIDs []TenantID, q *Query,
}
func (s *Storage) runQuery(ctx context.Context, tenantIDs []TenantID, q *Query, writeBlockResultFunc func(workerID uint, br *blockResult)) error {
streamIDs := q.getSortedStreamIDs()
neededColumnNames, unneededColumnNames := q.getNeededColumns()
so := &genericSearchOptions{
tenantIDs: tenantIDs,
streamIDs: streamIDs,
filter: q.f,
neededColumnNames: neededColumnNames,
unneededColumnNames: unneededColumnNames,
@ -653,6 +659,12 @@ func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter,
var streamIDs []streamID
if sf != nil {
streamIDs = pt.idb.searchStreamIDs(tenantIDs, sf)
if len(so.streamIDs) > 0 {
streamIDs = intersectStreamIDs(streamIDs, so.streamIDs)
}
tenantIDs = nil
} else if len(so.streamIDs) > 0 {
streamIDs = getStreamIDsForTenantIDs(so.streamIDs, tenantIDs)
tenantIDs = nil
}
if hasStreamFilters(f) {
@ -671,6 +683,36 @@ func (pt *partition) search(minTimestamp, maxTimestamp int64, sf *StreamFilter,
return pt.ddb.search(soInternal, workCh, stopCh)
}
func intersectStreamIDs(a, b []streamID) []streamID {
m := make(map[streamID]struct{}, len(b))
for _, streamID := range b {
m[streamID] = struct{}{}
}
result := make([]streamID, 0, len(a))
for _, streamID := range a {
if _, ok := m[streamID]; ok {
result = append(result, streamID)
}
}
return result
}
func getStreamIDsForTenantIDs(streamIDs []streamID, tenantIDs []TenantID) []streamID {
m := make(map[TenantID]struct{}, len(tenantIDs))
for _, tenantID := range tenantIDs {
m[tenantID] = struct{}{}
}
result := make([]streamID, 0, len(streamIDs))
for _, streamID := range streamIDs {
if _, ok := m[streamID.tenantID]; ok {
result = append(result, streamID)
}
}
return result
}
func hasStreamFilters(f filter) bool {
visitFunc := func(f filter) bool {
_, ok := f.(*filterStream)

View file

@ -14,7 +14,9 @@ import (
)
func TestStorageRunQuery(t *testing.T) {
const path = "TestStorageRunQuery"
t.Parallel()
path := t.Name()
const tenantsCount = 11
const streamsPerTenant = 3
@ -322,6 +324,7 @@ func TestStorageRunQuery(t *testing.T) {
resultExpected := []ValueWithHits{
{"_msg", 1155},
{"_stream", 1155},
{"_stream_id", 1155},
{"_time", 1155},
{"instance", 1155},
{"job", 1155},
@ -343,6 +346,7 @@ func TestStorageRunQuery(t *testing.T) {
resultExpected := []ValueWithHits{
{"_msg", 385},
{"_stream", 385},
{"_stream_id", 385},
{"_time", 385},
{"instance", 385},
{"job", 385},
@ -635,7 +639,9 @@ func mustParseQuery(query string) *Query {
}
func TestStorageSearch(t *testing.T) {
const path = "TestStorageSearch"
t.Parallel()
path := t.Name()
const tenantsCount = 11
const streamsPerTenant = 3
@ -962,6 +968,8 @@ func TestStorageSearch(t *testing.T) {
}
func TestParseStreamFieldsSuccess(t *testing.T) {
t.Parallel()
f := func(s, resultExpected string) {
t.Helper()

View file

@ -7,8 +7,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
)
func TestStorageLifecycle(_ *testing.T) {
const path = "TestStorageLifecycle"
func TestStorageLifecycle(t *testing.T) {
t.Parallel()
path := t.Name()
for i := 0; i < 3; i++ {
cfg := &StorageConfig{}
@ -19,7 +21,9 @@ func TestStorageLifecycle(_ *testing.T) {
}
func TestStorageMustAddRows(t *testing.T) {
const path = "TestStorageMustAddRows"
t.Parallel()
path := t.Name()
var sStats StorageStats

View file

@ -1,6 +1,7 @@
package logstorage
import (
"encoding/hex"
"fmt"
)
@ -25,6 +26,27 @@ func (sid *streamID) reset() {
*sid = streamID{}
}
// marshalString returns _stream_id value for the given sid.
func (sid *streamID) marshalString(dst []byte) []byte {
bb := bbPool.Get()
bb.B = sid.marshal(bb.B)
dst = hex.AppendEncode(dst, bb.B)
bbPool.Put(bb)
return dst
}
func (sid *streamID) tryUnmarshalFromString(s string) bool {
data, err := hex.DecodeString(s)
if err != nil {
return false
}
tail, err := sid.unmarshal(data)
if err != nil || len(tail) > 0 {
return false
}
return true
}
// String returns human-readable representation for sid.
func (sid *streamID) String() string {
return fmt.Sprintf("(tenant_id=%s, id=%s)", &sid.tenantID, &sid.id)

View file

@ -5,6 +5,36 @@ import (
"testing"
)
func TestStreamIDMarshalUnmarshalString(t *testing.T) {
f := func(sid *streamID) {
t.Helper()
s := string(sid.marshalString(nil))
var sid2 streamID
if !sid2.tryUnmarshalFromString(s) {
t.Fatalf("cannot unmarshal streamID from %q", s)
}
s2 := string(sid2.marshalString(nil))
if s != s2 {
t.Fatalf("unexpected marshaled streamID; got %s; want %s", s2, s)
}
}
f(&streamID{})
f(&streamID{
tenantID: TenantID{
AccountID: 123,
ProjectID: 456,
},
id: u128{
lo: 89,
hi: 344334,
},
})
}
func TestStreamIDMarshalUnmarshal(t *testing.T) {
f := func(sid *streamID, marshaledLen int) {
t.Helper()