mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vlselect: follow-up for 451d2abf50
- Consistently return the first `limit` log entries if the total size of found log entries doesn't exceed 1Mb. See app/vlselect/logsql/sort_writer.go . Previously random log entries could be returned with each request. - Document the change at docs/VictoriaLogs/CHANGELOG.md - Document the `limit` query arg at docs/VictoriaLogs/querying/README.md - Make the change less intrusive. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778
This commit is contained in:
parent
451d2abf50
commit
0514091948
11 changed files with 169 additions and 116 deletions
|
@ -2,7 +2,6 @@ package logsql
|
|||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
|
@ -19,13 +18,18 @@ var (
|
|||
)
|
||||
|
||||
// ProcessQueryRequest handles /select/logsql/query request
|
||||
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}) {
|
||||
func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan struct{}, cancel func()) {
|
||||
// Extract tenantID
|
||||
tenantID, err := logstorage.GetTenantIDFromRequest(r)
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
limit, err := httputils.GetInt(r, "limit")
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return
|
||||
}
|
||||
|
||||
qStr := r.FormValue("query")
|
||||
q, err := logstorage.ParseQuery(qStr)
|
||||
|
@ -33,36 +37,27 @@ func ProcessQueryRequest(w http.ResponseWriter, r *http.Request, stopCh <-chan s
|
|||
httpserver.Errorf(w, r, "cannot parse query [%s]: %s", qStr, err)
|
||||
return
|
||||
}
|
||||
limit, err := httputils.GetInt(r, "limit")
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "cannot parse limit from the request: %s", err)
|
||||
return
|
||||
}
|
||||
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
||||
sw := getSortWriter()
|
||||
sw.Init(w, maxSortBufferSize.IntN())
|
||||
tenantIDs := []logstorage.TenantID{tenantID}
|
||||
|
||||
var mx sync.Mutex
|
||||
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) bool {
|
||||
sw := getSortWriter()
|
||||
sw.Init(w, maxSortBufferSize.IntN(), limit)
|
||||
tenantIDs := []logstorage.TenantID{tenantID}
|
||||
vlstorage.RunQuery(tenantIDs, q, stopCh, func(columns []logstorage.BlockColumn) {
|
||||
if len(columns) == 0 {
|
||||
return true
|
||||
return
|
||||
}
|
||||
rowsCount := len(columns[0].Values)
|
||||
mx.Lock()
|
||||
if rowsCount > limit {
|
||||
rowsCount = limit
|
||||
}
|
||||
limit = limit - rowsCount
|
||||
mx.Unlock()
|
||||
|
||||
bb := blockResultPool.Get()
|
||||
for rowIdx := 0; rowIdx < rowsCount; rowIdx++ {
|
||||
WriteJSONRow(bb, columns, rowIdx)
|
||||
}
|
||||
sw.MustWrite(bb.B)
|
||||
blockResultPool.Put(bb)
|
||||
|
||||
return limit == 0
|
||||
if !sw.TryWrite(bb.B) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
blockResultPool.Put(bb)
|
||||
})
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
|
|
|
@ -36,8 +36,12 @@ var sortWriterPool sync.Pool
|
|||
// If the buf isn't empty at FinalFlush() call, then the buffered data
|
||||
// is sorted by _time field.
|
||||
type sortWriter struct {
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
|
||||
maxLines int
|
||||
linesWritten int
|
||||
|
||||
maxBufLen int
|
||||
buf []byte
|
||||
bufFlushed bool
|
||||
|
@ -47,58 +51,121 @@ type sortWriter struct {
|
|||
|
||||
func (sw *sortWriter) reset() {
|
||||
sw.w = nil
|
||||
|
||||
sw.maxLines = 0
|
||||
sw.linesWritten = 0
|
||||
|
||||
sw.maxBufLen = 0
|
||||
sw.buf = sw.buf[:0]
|
||||
sw.bufFlushed = false
|
||||
sw.hasErr = false
|
||||
}
|
||||
|
||||
func (sw *sortWriter) Init(w io.Writer, maxBufLen int) {
|
||||
// Init initializes sw.
|
||||
//
|
||||
// If maxLines is set to positive value, then sw accepts up to maxLines
|
||||
// and then rejects all the other lines by returning false from TryWrite.
|
||||
func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
|
||||
sw.reset()
|
||||
|
||||
sw.w = w
|
||||
sw.maxBufLen = maxBufLen
|
||||
sw.maxLines = maxLines
|
||||
}
|
||||
|
||||
func (sw *sortWriter) MustWrite(p []byte) {
|
||||
// TryWrite writes p to sw.
|
||||
//
|
||||
// True is returned on successful write, false otherwise.
|
||||
//
|
||||
// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
|
||||
func (sw *sortWriter) TryWrite(p []byte) bool {
|
||||
sw.mu.Lock()
|
||||
defer sw.mu.Unlock()
|
||||
|
||||
if sw.hasErr {
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
if sw.bufFlushed {
|
||||
if _, err := sw.w.Write(p); err != nil {
|
||||
if !sw.writeToUnderlyingWriterLocked(p) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
if len(sw.buf)+len(p) < sw.maxBufLen {
|
||||
sw.buf = append(sw.buf, p...)
|
||||
return
|
||||
return true
|
||||
}
|
||||
|
||||
sw.bufFlushed = true
|
||||
if len(sw.buf) > 0 {
|
||||
if _, err := sw.w.Write(sw.buf); err != nil {
|
||||
sw.hasErr = true
|
||||
return
|
||||
if !sw.writeToUnderlyingWriterLocked(sw.buf) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
sw.buf = sw.buf[:0]
|
||||
|
||||
if !sw.writeToUnderlyingWriterLocked(p) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
|
||||
if len(p) == 0 {
|
||||
return true
|
||||
}
|
||||
if sw.maxLines > 0 {
|
||||
if sw.linesWritten >= sw.maxLines {
|
||||
return false
|
||||
}
|
||||
sw.buf = sw.buf[:0]
|
||||
var linesLeft int
|
||||
p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
|
||||
println("DEBUG: end trimLines", string(p), linesLeft)
|
||||
sw.linesWritten += linesLeft
|
||||
}
|
||||
if _, err := sw.w.Write(p); err != nil {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func trimLines(p []byte, maxLines int) ([]byte, int) {
|
||||
println("DEBUG: start trimLines", string(p), maxLines)
|
||||
if maxLines <= 0 {
|
||||
return nil, 0
|
||||
}
|
||||
n := bytes.Count(p, newline)
|
||||
if n < maxLines {
|
||||
return p, n
|
||||
}
|
||||
for n >= maxLines {
|
||||
idx := bytes.LastIndexByte(p, '\n')
|
||||
p = p[:idx]
|
||||
n--
|
||||
}
|
||||
return p[:len(p)+1], maxLines
|
||||
}
|
||||
|
||||
var newline = []byte("\n")
|
||||
|
||||
func (sw *sortWriter) FinalFlush() {
|
||||
if sw.hasErr || sw.bufFlushed {
|
||||
return
|
||||
}
|
||||
|
||||
rs := getRowsSorter()
|
||||
rs.parseRows(sw.buf)
|
||||
rs.sort()
|
||||
WriteJSONRows(sw.w, rs.rows)
|
||||
|
||||
rows := rs.rows
|
||||
if sw.maxLines > 0 && len(rows) > sw.maxLines {
|
||||
rows = rows[:sw.maxLines]
|
||||
}
|
||||
WriteJSONRows(sw.w, rows)
|
||||
|
||||
putRowsSorter(rs)
|
||||
}
|
||||
|
||||
|
|
|
@ -7,15 +7,16 @@ import (
|
|||
)
|
||||
|
||||
func TestSortWriter(t *testing.T) {
|
||||
f := func(maxBufLen int, data string, expectedResult string) {
|
||||
f := func(maxBufLen, maxLines int, data string, expectedResult string) {
|
||||
t.Helper()
|
||||
|
||||
var bb bytes.Buffer
|
||||
sw := getSortWriter()
|
||||
sw.Init(&bb, maxBufLen)
|
||||
|
||||
sw.Init(&bb, maxBufLen, maxLines)
|
||||
for _, s := range strings.Split(data, "\n") {
|
||||
sw.MustWrite([]byte(s + "\n"))
|
||||
if !sw.TryWrite([]byte(s + "\n")) {
|
||||
break
|
||||
}
|
||||
}
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
|
@ -26,14 +27,20 @@ func TestSortWriter(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
f(100, "", "")
|
||||
f(100, "{}", "{}\n")
|
||||
f(100, 0, "", "")
|
||||
f(100, 0, "{}", "{}\n")
|
||||
|
||||
data := `{"_time":"def","_msg":"xxx"}
|
||||
{"_time":"abc","_msg":"foo"}`
|
||||
resultExpected := `{"_time":"abc","_msg":"foo"}
|
||||
{"_time":"def","_msg":"xxx"}
|
||||
`
|
||||
f(100, data, resultExpected)
|
||||
f(10, data, data+"\n")
|
||||
f(100, 0, data, resultExpected)
|
||||
f(10, 0, data, data+"\n")
|
||||
|
||||
// Test with the maxLines
|
||||
f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n")
|
||||
f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n")
|
||||
f(10, 2, data, data+"\n")
|
||||
f(100, 2, data, resultExpected)
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package vlselect
|
||||
|
||||
import (
|
||||
"context"
|
||||
"embed"
|
||||
"flag"
|
||||
"fmt"
|
||||
|
@ -101,7 +102,8 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
|
||||
// Limit the number of concurrent queries, which can consume big amounts of CPU.
|
||||
startTime := time.Now()
|
||||
stopCh := r.Context().Done()
|
||||
ctx := r.Context()
|
||||
stopCh := ctx.Done()
|
||||
select {
|
||||
case concurrencyLimitCh <- struct{}{}:
|
||||
defer func() { <-concurrencyLimitCh }()
|
||||
|
@ -139,11 +141,15 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
|||
}
|
||||
}
|
||||
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
stopCh = ctxWithCancel.Done()
|
||||
|
||||
switch {
|
||||
case path == "/logsql/query":
|
||||
logsqlQueryRequests.Inc()
|
||||
httpserver.EnableCORS(w, r)
|
||||
logsql.ProcessQueryRequest(w, r, stopCh)
|
||||
logsql.ProcessQueryRequest(w, r, stopCh, cancel)
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
|
|
@ -100,7 +100,7 @@ func MustAddRows(lr *logstorage.LogRows) {
|
|||
}
|
||||
|
||||
// RunQuery runs the given q and calls processBlock for the returned data blocks
|
||||
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn) bool) {
|
||||
func RunQuery(tenantIDs []logstorage.TenantID, q *logstorage.Query, stopCh <-chan struct{}, processBlock func(columns []logstorage.BlockColumn)) {
|
||||
strg.RunQuery(tenantIDs, q, stopCh, processBlock)
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ Released at 2024-02-14
|
|||
* FEATURE: [dashboards/all](https://grafana.com/orgs/victoriametrics): add new panel `CPU spent on GC`. It should help identifying cases when too much CPU is spent on garbage collection, and advice users on how this can be addressed.
|
||||
* FEATURE: [vmalert](https://docs.victoriametrics.com/#vmalert): support [filtering](https://prometheus.io/docs/prometheus/2.49/querying/api/#rules) for `/api/v1/rules` API. See [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5749) by @victoramsantos.
|
||||
* FEATURE: [vmbackup](https://docs.victoriametrics.com/vmbackup.html): support client-side TLS configuration for creating and deleting snapshots via `-snapshot.tls*` cmd-line flags. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5724). Thanks to @khushijain21 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5738).
|
||||
* FEATURE: enable `limit` query param for `/select/logsql/query` VictoriaLogs API endpoint. By default this limit sets to 1000 lines, but it can be changed via query param. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): reduce CPU usage when `-promscrape.dropOriginalLabels` command-line flag is set. This issue has been introduced in [v1.96.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.96.0) when addressing [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5389).
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth.html): properly release memory during config reload. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4690).
|
||||
|
|
|
@ -19,6 +19,8 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/QuickSta
|
|||
|
||||
## tip
|
||||
|
||||
* FEATURE: support the ability to limit the number of returned log entries from [HTTP querying API](https://docs.victoriametrics.com/victorialogs/querying/#http-api) by passing `limit` query arg. Previously all the matching log entries were returned until closing the response stream. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5674). Thanks to @dmitryk-dk for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5778).
|
||||
|
||||
## [v0.4.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.4.2-victorialogs)
|
||||
|
||||
Released at 2023-11-15
|
||||
|
|
|
@ -44,6 +44,15 @@ See [LogsQL docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) for
|
|||
The `query` arg must be properly encoded with [percent encoding](https://en.wikipedia.org/wiki/URL_encoding) when passing it to `curl`
|
||||
or similar tools.
|
||||
|
||||
By default the `/select/logsql/query` returns all the log entries matching the given `query`. The response size can be limited in the following ways:
|
||||
|
||||
- By closing the response stream at any time. In this case VictoriaLogs stops query execution and frees all the resources occupied by the request.
|
||||
- By specifying the maximum number of log entries, which can be returned in the response via `limit` query arg. For example, the following request returns
|
||||
up to 10 matching log entries:
|
||||
```sh
|
||||
curl http://localhost:9428/select/logsql/query -d 'query=error' -d 'limit=10'
|
||||
```
|
||||
|
||||
The `/select/logsql/query` endpoint returns [a stream of JSON lines](https://jsonlines.org/),
|
||||
where each line contains JSON-encoded log entry in the form `{field1="value1",...,fieldN="valueN"}`.
|
||||
Example response:
|
||||
|
|
|
@ -9226,7 +9226,7 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
|
|||
resultColumnNames: []string{resultColumnName},
|
||||
}
|
||||
workersCount := 3
|
||||
s.search(workersCount, so, nil, func(workerID uint, br *blockResult) bool {
|
||||
s.search(workersCount, so, nil, func(workerID uint, br *blockResult) {
|
||||
// Verify tenantID
|
||||
if !br.streamID.tenantID.equal(&tenantID) {
|
||||
t.Fatalf("unexpected tenantID in blockResult; got %s; want %s", &br.streamID.tenantID, &tenantID)
|
||||
|
@ -9248,7 +9248,6 @@ func testFilterMatchForStorage(t *testing.T, s *Storage, tenantID TenantID, f fi
|
|||
if !reflect.DeepEqual(br.timestamps, expectedTimestamps) {
|
||||
t.Fatalf("unexpected timestamps;\ngot\n%d\nwant\n%d", br.timestamps, expectedTimestamps)
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -44,7 +43,7 @@ type searchOptions struct {
|
|||
}
|
||||
|
||||
// RunQuery runs the given q and calls processBlock for results
|
||||
func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn) bool) {
|
||||
func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{}, processBlock func(columns []BlockColumn)) {
|
||||
resultColumnNames := q.getResultColumnNames()
|
||||
so := &genericSearchOptions{
|
||||
tenantIDs: tenantIDs,
|
||||
|
@ -52,7 +51,7 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
|
|||
resultColumnNames: resultColumnNames,
|
||||
}
|
||||
workersCount := cgroup.AvailableCPUs()
|
||||
s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) bool {
|
||||
s.search(workersCount, so, stopCh, func(workerID uint, br *blockResult) {
|
||||
brs := getBlockRows()
|
||||
cs := brs.cs
|
||||
|
||||
|
@ -62,11 +61,10 @@ func (s *Storage) RunQuery(tenantIDs []TenantID, q *Query, stopCh <-chan struct{
|
|||
Values: br.getColumnValues(i),
|
||||
})
|
||||
}
|
||||
limitReached := processBlock(cs)
|
||||
processBlock(cs)
|
||||
|
||||
brs.cs = cs
|
||||
putBlockRows(brs)
|
||||
return limitReached
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -120,7 +118,7 @@ const blockSearchWorksPerBatch = 64
|
|||
// searchResultFunc must process sr.
|
||||
//
|
||||
// The callback is called at the worker with the given workerID.
|
||||
type searchResultFunc func(workerID uint, br *blockResult) bool
|
||||
type searchResultFunc func(workerID uint, br *blockResult)
|
||||
|
||||
// search searches for the matching rows according to so.
|
||||
//
|
||||
|
@ -130,34 +128,19 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
|||
var wg sync.WaitGroup
|
||||
workCh := make(chan []*blockSearchWork, workersCount)
|
||||
wg.Add(workersCount)
|
||||
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
|
||||
for i := 0; i < workersCount; i++ {
|
||||
go func(workerID uint) {
|
||||
defer wg.Done()
|
||||
bs := getBlockSearch()
|
||||
defer putBlockSearch(bs)
|
||||
for {
|
||||
select {
|
||||
case bsws, ok := <-workCh:
|
||||
if !ok {
|
||||
return
|
||||
for bsws := range workCh {
|
||||
for _, bsw := range bsws {
|
||||
bs.search(bsw)
|
||||
if bs.br.RowsCount() > 0 {
|
||||
processBlockResult(workerID, &bs.br)
|
||||
}
|
||||
for _, bsw := range bsws {
|
||||
bs.search(bsw)
|
||||
if bs.br.RowsCount() > 0 {
|
||||
limitReached := processBlockResult(workerID, &bs.br)
|
||||
if limitReached {
|
||||
cancelFn()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
putBlockSearch(bs)
|
||||
wg.Done()
|
||||
}(uint(i))
|
||||
}
|
||||
|
||||
|
@ -195,7 +178,6 @@ func (s *Storage) search(workersCount int, so *genericSearchOptions, stopCh <-ch
|
|||
// Wait until workers finish their work
|
||||
close(workCh)
|
||||
wg.Wait()
|
||||
cancelFn()
|
||||
|
||||
// Decrement references to parts
|
||||
for _, pw := range pws {
|
||||
|
|
|
@ -84,7 +84,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
AccountID: 0,
|
||||
ProjectID: 0,
|
||||
}
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
|
@ -96,7 +96,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
AccountID: 1,
|
||||
ProjectID: 11,
|
||||
}
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
|
@ -111,7 +111,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
}
|
||||
expectedTenantID := tenantID.String()
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
hasTenantIDColumn := false
|
||||
var columnNames []string
|
||||
for _, c := range columns {
|
||||
|
@ -132,7 +132,6 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
panic(fmt.Errorf("missing tenant.id column among columns: %q", columnNames))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||
|
@ -146,9 +145,8 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
t.Run("matching-multiple-tenant-ids", func(t *testing.T) {
|
||||
q := mustParseQuery(`"log message"`)
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||
|
||||
|
@ -160,9 +158,8 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
t.Run("matching-in-filter", func(t *testing.T) {
|
||||
q := mustParseQuery(`source-file:in(foobar,/foo/bar/baz)`)
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||
|
||||
|
@ -173,7 +170,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
})
|
||||
t.Run("stream-filter-mismatch", func(t *testing.T) {
|
||||
q := mustParseQuery(`_stream:{job="foobar",instance=~"host-.+:2345"} log`)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.RunQuery(allTenantIDs, q, nil, processBlock)
|
||||
|
@ -187,7 +184,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
}
|
||||
expectedStreamID := fmt.Sprintf("stream_id=%d", i)
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
hasStreamIDColumn := false
|
||||
var columnNames []string
|
||||
for _, c := range columns {
|
||||
|
@ -208,7 +205,6 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
panic(fmt.Errorf("missing stream-id column among columns: %q", columnNames))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||
|
@ -226,9 +222,8 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
ProjectID: 11,
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||
|
@ -247,9 +242,8 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
ProjectID: 11,
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||
|
@ -268,9 +262,8 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
ProjectID: 11,
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
atomic.AddUint32(&rowsCount, uint32(len(columns[0].Values)))
|
||||
return false
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
s.RunQuery(tenantIDs, q, nil, processBlock)
|
||||
|
@ -288,7 +281,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
AccountID: 1,
|
||||
ProjectID: 11,
|
||||
}
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
|
@ -302,7 +295,7 @@ func TestStorageRunQuery(t *testing.T) {
|
|||
AccountID: 1,
|
||||
ProjectID: 11,
|
||||
}
|
||||
processBlock := func(columns []BlockColumn) bool {
|
||||
processBlock := func(columns []BlockColumn) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
tenantIDs := []TenantID{tenantID}
|
||||
|
@ -412,7 +405,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
filter: f,
|
||||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
@ -430,7 +423,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
filter: f,
|
||||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
@ -448,7 +441,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
filter: f,
|
||||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
@ -468,12 +461,11 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
if !br.streamID.tenantID.equal(&tenantID) {
|
||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -493,9 +485,8 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -514,7 +505,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
filter: f,
|
||||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
@ -535,12 +526,11 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
if !br.streamID.tenantID.equal(&tenantID) {
|
||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -565,12 +555,11 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
if !br.streamID.tenantID.equal(&tenantID) {
|
||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -603,12 +592,11 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
if !br.streamID.tenantID.equal(&tenantID) {
|
||||
panic(fmt.Errorf("unexpected tenantID; got %s; want %s", &br.streamID.tenantID, &tenantID))
|
||||
}
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -632,9 +620,8 @@ func TestStorageSearch(t *testing.T) {
|
|||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
rowsCount := uint32(0)
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
atomic.AddUint32(&rowsCount, uint32(br.RowsCount()))
|
||||
return false
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
||||
|
@ -657,7 +644,7 @@ func TestStorageSearch(t *testing.T) {
|
|||
filter: f,
|
||||
resultColumnNames: []string{"_msg"},
|
||||
}
|
||||
processBlock := func(workerID uint, br *blockResult) bool {
|
||||
processBlock := func(workerID uint, br *blockResult) {
|
||||
panic(fmt.Errorf("unexpected match"))
|
||||
}
|
||||
s.search(workersCount, so, nil, processBlock)
|
||||
|
|
Loading…
Reference in a new issue