This commit is contained in:
Aliaksandr Valialkin 2024-05-14 03:04:11 +02:00
parent 1834c68250
commit 2314cdc38d
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
9 changed files with 70 additions and 368 deletions

View file

@ -0,0 +1,47 @@
package logsql
import (
"bufio"
"io"
"sync"
)
func getBufferedWriter(w io.Writer) *bufferedWriter {
v := bufferedWriterPool.Get()
if v == nil {
return &bufferedWriter{
bw: bufio.NewWriter(w),
}
}
bw := v.(*bufferedWriter)
bw.bw.Reset(w)
return bw
}
func putBufferedWriter(bw *bufferedWriter) {
bw.reset()
bufferedWriterPool.Put(bw)
}
var bufferedWriterPool sync.Pool
type bufferedWriter struct {
mu sync.Mutex
bw *bufio.Writer
}
func (bw *bufferedWriter) reset() {
// nothing to do
}
func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) {
bw.mu.Lock()
_, _ = bw.bw.Write(p)
bw.mu.Unlock()
}
func (bw *bufferedWriter) FlushIgnoreErrors() {
bw.mu.Lock()
_ = bw.bw.Flush()
bw.mu.Unlock()
}

View file

@ -6,18 +6,11 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
var (
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
"if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+
"too big value for this flag may result in high memory usage since the sorting is performed in memory")
)
// ProcessQueryRequest handles /select/logsql/query request.
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
// Extract tenantID
@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
}
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
sw := getSortWriter()
sw.Init(w, maxSortBufferSize.IntN(), limit)
if limit > 0 {
q.AddPipeLimit(uint64(limit))
}
tenantIDs := []logstorage.TenantID{tenantID}
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
bw := getBufferedWriter(w)
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
if len(columns) == 0 {
@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
for i := range timestamps {
WriteJSONRow(bb, columns, i)
}
if !sw.TryWrite(bb.B) {
cancel()
}
bw.WriteIgnoreErrors(bb.B)
blockResultPool.Put(bb)
}
err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
sw.FinalFlush()
putSortWriter(sw)
bw.FlushIgnoreErrors()
putBufferedWriter(bw)
if err != nil {
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err)

View file

@ -1,290 +0,0 @@
package logsql
import (
"bytes"
"io"
"sort"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
)
func getSortWriter() *sortWriter {
v := sortWriterPool.Get()
if v == nil {
return &sortWriter{}
}
return v.(*sortWriter)
}
func putSortWriter(sw *sortWriter) {
sw.reset()
sortWriterPool.Put(sw)
}
var sortWriterPool sync.Pool
// sortWriter expects JSON line stream to be written to it.
//
// It buffers the incoming data until its size reaches maxBufLen.
// Then it streams the buffered data and all the incoming data to w.
//
// The FinalFlush() must be called when all the data is written.
// If the buf isn't empty at FinalFlush() call, then the buffered data
// is sorted by _time field.
type sortWriter struct {
mu sync.Mutex
w io.Writer
maxLines int
linesWritten int
maxBufLen int
buf []byte
bufFlushed bool
hasErr bool
}
func (sw *sortWriter) reset() {
sw.w = nil
sw.maxLines = 0
sw.linesWritten = 0
sw.maxBufLen = 0
sw.buf = sw.buf[:0]
sw.bufFlushed = false
sw.hasErr = false
}
// Init initializes sw.
//
// If maxLines is set to positive value, then sw accepts up to maxLines
// and then rejects all the other lines by returning false from TryWrite.
func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
sw.reset()
sw.w = w
sw.maxBufLen = maxBufLen
sw.maxLines = maxLines
}
// TryWrite writes p to sw.
//
// True is returned on successful write, false otherwise.
//
// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
func (sw *sortWriter) TryWrite(p []byte) bool {
sw.mu.Lock()
defer sw.mu.Unlock()
if sw.hasErr {
return false
}
if sw.bufFlushed {
if !sw.writeToUnderlyingWriterLocked(p) {
sw.hasErr = true
return false
}
return true
}
if len(sw.buf)+len(p) < sw.maxBufLen {
sw.buf = append(sw.buf, p...)
return true
}
sw.bufFlushed = true
if !sw.writeToUnderlyingWriterLocked(sw.buf) {
sw.hasErr = true
return false
}
sw.buf = sw.buf[:0]
if !sw.writeToUnderlyingWriterLocked(p) {
sw.hasErr = true
return false
}
return true
}
func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
if len(p) == 0 {
return true
}
if sw.maxLines > 0 {
if sw.linesWritten >= sw.maxLines {
return false
}
var linesLeft int
p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
sw.linesWritten += linesLeft
}
if _, err := sw.w.Write(p); err != nil {
return false
}
return true
}
func trimLines(p []byte, maxLines int) ([]byte, int) {
if maxLines <= 0 {
return nil, 0
}
n := bytes.Count(p, newline)
if n < maxLines {
return p, n
}
for n >= maxLines {
idx := bytes.LastIndexByte(p, '\n')
p = p[:idx]
n--
}
return p[:len(p)+1], maxLines
}
var newline = []byte("\n")
func (sw *sortWriter) FinalFlush() {
if sw.hasErr || sw.bufFlushed {
return
}
rs := getRowsSorter()
rs.parseRows(sw.buf)
rs.sort()
rows := rs.rows
if sw.maxLines > 0 && len(rows) > sw.maxLines {
rows = rows[:sw.maxLines]
}
WriteJSONRows(sw.w, rows)
putRowsSorter(rs)
}
func getRowsSorter() *rowsSorter {
v := rowsSorterPool.Get()
if v == nil {
return &rowsSorter{}
}
return v.(*rowsSorter)
}
func putRowsSorter(rs *rowsSorter) {
rs.reset()
rowsSorterPool.Put(rs)
}
var rowsSorterPool sync.Pool
type rowsSorter struct {
buf []byte
fieldsBuf []logstorage.Field
rows [][]logstorage.Field
times []string
}
func (rs *rowsSorter) reset() {
rs.buf = rs.buf[:0]
fieldsBuf := rs.fieldsBuf
for i := range fieldsBuf {
fieldsBuf[i].Reset()
}
rs.fieldsBuf = fieldsBuf[:0]
rows := rs.rows
for i := range rows {
rows[i] = nil
}
rs.rows = rows[:0]
times := rs.times
for i := range times {
times[i] = ""
}
rs.times = times[:0]
}
func (rs *rowsSorter) parseRows(src []byte) {
rs.reset()
buf := rs.buf
fieldsBuf := rs.fieldsBuf
rows := rs.rows
times := rs.times
p := logjson.GetParser()
for len(src) > 0 {
var line []byte
n := bytes.IndexByte(src, '\n')
if n < 0 {
line = src
src = nil
} else {
line = src[:n]
src = src[n+1:]
}
if len(line) == 0 {
continue
}
if err := p.ParseLogMessage(line); err != nil {
logger.Panicf("BUG: unexpected invalid JSON line: %s", err)
}
timeValue := ""
fieldsBufLen := len(fieldsBuf)
for _, f := range p.Fields {
bufLen := len(buf)
buf = append(buf, f.Name...)
name := bytesutil.ToUnsafeString(buf[bufLen:])
bufLen = len(buf)
buf = append(buf, f.Value...)
value := bytesutil.ToUnsafeString(buf[bufLen:])
fieldsBuf = append(fieldsBuf, logstorage.Field{
Name: name,
Value: value,
})
if name == "_time" {
timeValue = value
}
}
rows = append(rows, fieldsBuf[fieldsBufLen:])
times = append(times, timeValue)
}
logjson.PutParser(p)
rs.buf = buf
rs.fieldsBuf = fieldsBuf
rs.rows = rows
rs.times = times
}
func (rs *rowsSorter) Len() int {
return len(rs.rows)
}
func (rs *rowsSorter) Less(i, j int) bool {
times := rs.times
return times[i] < times[j]
}
func (rs *rowsSorter) Swap(i, j int) {
times := rs.times
rows := rs.rows
times[i], times[j] = times[j], times[i]
rows[i], rows[j] = rows[j], rows[i]
}
func (rs *rowsSorter) sort() {
sort.Sort(rs)
}

View file

@ -1,46 +0,0 @@
package logsql
import (
"bytes"
"strings"
"testing"
)
func TestSortWriter(t *testing.T) {
f := func(maxBufLen, maxLines int, data string, expectedResult string) {
t.Helper()
var bb bytes.Buffer
sw := getSortWriter()
sw.Init(&bb, maxBufLen, maxLines)
for _, s := range strings.Split(data, "\n") {
if !sw.TryWrite([]byte(s + "\n")) {
break
}
}
sw.FinalFlush()
putSortWriter(sw)
result := bb.String()
if result != expectedResult {
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
}
}
f(100, 0, "", "")
f(100, 0, "{}", "{}\n")
data := `{"_time":"def","_msg":"xxx"}
{"_time":"abc","_msg":"foo"}`
resultExpected := `{"_time":"abc","_msg":"foo"}
{"_time":"def","_msg":"xxx"}
`
f(100, 0, data, resultExpected)
f(10, 0, data, data+"\n")
// Test with the maxLines
f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n")
f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n")
f(10, 2, data, data+"\n")
f(100, 2, data, resultExpected)
}

View file

@ -1641,11 +1641,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying
## Sorting
By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to 1MB).
Otherwise sorting is skipped because of performance reasons.
Use [`sort` pipe](#sort-pipe) for sorting the results.
By default VictoriaLogs doesn't sort the returned results because of performance reasons. Use [`sort` pipe](#sort-pipe) for sorting the results.
## Limiters

View file

@ -252,9 +252,6 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
The maximum duration for query execution (default 30s)
-search.maxQueueDuration duration
The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s)
-select.maxSortBufferSize size
Query results from /select/logsql/query are automatically sorted by _time if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; too big value for this flag may result in high memory usage since the sorting is performed in memory
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576)
-storageDataPath string
Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data")
-storage.minFreeDiskSpaceBytes size

View file

@ -66,10 +66,8 @@ The response can be interrupted at any time by closing the connection to Victori
This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
See [these docs](#command-line) for more details.
The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte).
Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
The returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sort-pipe)
or at client side with the usual `sort` command according to [these docs](#command-line).
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy) is queried.

View file

@ -261,7 +261,8 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
}
// Initialize timestamps, since they are required for all the further work with br.
if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") {
so := bs.bsw.so
if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") {
// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
rowsLen := bm.onesCount()
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)

View file

@ -206,6 +206,15 @@ func (q *Query) String() string {
return s
}
// AddPipeLimit adds `| limit n` pipe to q.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
func (q *Query) AddPipeLimit(n uint64) {
q.pipes = append(q.pipes, &pipeLimit{
n: n,
})
}
func (q *Query) getNeededColumns() ([]string, []string) {
neededFields := newFieldsSet()
neededFields.add("*")