diff --git a/app/vlselect/logsql/buffered_writer.go b/app/vlselect/logsql/buffered_writer.go new file mode 100644 index 000000000..57ad56199 --- /dev/null +++ b/app/vlselect/logsql/buffered_writer.go @@ -0,0 +1,47 @@ +package logsql + +import ( + "bufio" + "io" + "sync" +) + +func getBufferedWriter(w io.Writer) *bufferedWriter { + v := bufferedWriterPool.Get() + if v == nil { + return &bufferedWriter{ + bw: bufio.NewWriter(w), + } + } + bw := v.(*bufferedWriter) + bw.bw.Reset(w) + return bw +} + +func putBufferedWriter(bw *bufferedWriter) { + bw.reset() + bufferedWriterPool.Put(bw) +} + +var bufferedWriterPool sync.Pool + +type bufferedWriter struct { + mu sync.Mutex + bw *bufio.Writer +} + +func (bw *bufferedWriter) reset() { + // nothing to do +} + +func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) { + bw.mu.Lock() + _, _ = bw.bw.Write(p) + bw.mu.Unlock() +} + +func (bw *bufferedWriter) FlushIgnoreErrors() { + bw.mu.Lock() + _ = bw.bw.Flush() + bw.mu.Unlock() +} diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go index 0b861ccda..6cf2b266f 100644 --- a/app/vlselect/logsql/logsql.go +++ b/app/vlselect/logsql/logsql.go @@ -6,18 +6,11 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" ) -var ( - maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+ - "if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+ - "too big value for this flag may result in high memory usage since the sorting is performed in memory") -) - // ProcessQueryRequest handles /select/logsql/query request. func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) { // Extract tenantID @@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req } w.Header().Set("Content-Type", "application/stream+json; charset=utf-8") - sw := getSortWriter() - sw.Init(w, maxSortBufferSize.IntN(), limit) + if limit > 0 { + q.AddPipeLimit(uint64(limit)) + } + tenantIDs := []logstorage.TenantID{tenantID} - ctxWithCancel, cancel := context.WithCancel(ctx) - defer cancel() + bw := getBufferedWriter(w) writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) { if len(columns) == 0 { @@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req for i := range timestamps { WriteJSONRow(bb, columns, i) } - - if !sw.TryWrite(bb.B) { - cancel() - } - + bw.WriteIgnoreErrors(bb.B) blockResultPool.Put(bb) } - err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock) + err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock) - sw.FinalFlush() - putSortWriter(sw) + bw.FlushIgnoreErrors() + putBufferedWriter(bw) if err != nil { httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err) diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go deleted file mode 100644 index 3ad0fbf70..000000000 --- a/app/vlselect/logsql/sort_writer.go +++ /dev/null @@ -1,290 +0,0 @@ -package logsql - -import ( - "bytes" - "io" - "sort" - "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" -) - -func getSortWriter() *sortWriter { - v := sortWriterPool.Get() - if v == nil { - return &sortWriter{} - } - return v.(*sortWriter) -} - -func putSortWriter(sw *sortWriter) { - sw.reset() - sortWriterPool.Put(sw) -} - -var sortWriterPool sync.Pool - -// sortWriter expects JSON line stream to be written to it. -// -// It buffers the incoming data until its size reaches maxBufLen. -// Then it streams the buffered data and all the incoming data to w. -// -// The FinalFlush() must be called when all the data is written. -// If the buf isn't empty at FinalFlush() call, then the buffered data -// is sorted by _time field. -type sortWriter struct { - mu sync.Mutex - w io.Writer - - maxLines int - linesWritten int - - maxBufLen int - buf []byte - bufFlushed bool - - hasErr bool -} - -func (sw *sortWriter) reset() { - sw.w = nil - - sw.maxLines = 0 - sw.linesWritten = 0 - - sw.maxBufLen = 0 - sw.buf = sw.buf[:0] - sw.bufFlushed = false - sw.hasErr = false -} - -// Init initializes sw. -// -// If maxLines is set to positive value, then sw accepts up to maxLines -// and then rejects all the other lines by returning false from TryWrite. -func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) { - sw.reset() - - sw.w = w - sw.maxBufLen = maxBufLen - sw.maxLines = maxLines -} - -// TryWrite writes p to sw. -// -// True is returned on successful write, false otherwise. -// -// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw. -func (sw *sortWriter) TryWrite(p []byte) bool { - sw.mu.Lock() - defer sw.mu.Unlock() - - if sw.hasErr { - return false - } - - if sw.bufFlushed { - if !sw.writeToUnderlyingWriterLocked(p) { - sw.hasErr = true - return false - } - return true - } - - if len(sw.buf)+len(p) < sw.maxBufLen { - sw.buf = append(sw.buf, p...) - return true - } - - sw.bufFlushed = true - if !sw.writeToUnderlyingWriterLocked(sw.buf) { - sw.hasErr = true - return false - } - sw.buf = sw.buf[:0] - - if !sw.writeToUnderlyingWriterLocked(p) { - sw.hasErr = true - return false - } - return true -} - -func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool { - if len(p) == 0 { - return true - } - if sw.maxLines > 0 { - if sw.linesWritten >= sw.maxLines { - return false - } - var linesLeft int - p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten) - sw.linesWritten += linesLeft - } - if _, err := sw.w.Write(p); err != nil { - return false - } - return true -} - -func trimLines(p []byte, maxLines int) ([]byte, int) { - if maxLines <= 0 { - return nil, 0 - } - n := bytes.Count(p, newline) - if n < maxLines { - return p, n - } - for n >= maxLines { - idx := bytes.LastIndexByte(p, '\n') - p = p[:idx] - n-- - } - return p[:len(p)+1], maxLines -} - -var newline = []byte("\n") - -func (sw *sortWriter) FinalFlush() { - if sw.hasErr || sw.bufFlushed { - return - } - - rs := getRowsSorter() - rs.parseRows(sw.buf) - rs.sort() - - rows := rs.rows - if sw.maxLines > 0 && len(rows) > sw.maxLines { - rows = rows[:sw.maxLines] - } - WriteJSONRows(sw.w, rows) - - putRowsSorter(rs) -} - -func getRowsSorter() *rowsSorter { - v := rowsSorterPool.Get() - if v == nil { - return &rowsSorter{} - } - return v.(*rowsSorter) -} - -func putRowsSorter(rs *rowsSorter) { - rs.reset() - rowsSorterPool.Put(rs) -} - -var rowsSorterPool sync.Pool - -type rowsSorter struct { - buf []byte - fieldsBuf []logstorage.Field - rows [][]logstorage.Field - times []string -} - -func (rs *rowsSorter) reset() { - rs.buf = rs.buf[:0] - - fieldsBuf := rs.fieldsBuf - for i := range fieldsBuf { - fieldsBuf[i].Reset() - } - rs.fieldsBuf = fieldsBuf[:0] - - rows := rs.rows - for i := range rows { - rows[i] = nil - } - rs.rows = rows[:0] - - times := rs.times - for i := range times { - times[i] = "" - } - rs.times = times[:0] -} - -func (rs *rowsSorter) parseRows(src []byte) { - rs.reset() - - buf := rs.buf - fieldsBuf := rs.fieldsBuf - rows := rs.rows - times := rs.times - - p := logjson.GetParser() - for len(src) > 0 { - var line []byte - n := bytes.IndexByte(src, '\n') - if n < 0 { - line = src - src = nil - } else { - line = src[:n] - src = src[n+1:] - } - if len(line) == 0 { - continue - } - - if err := p.ParseLogMessage(line); err != nil { - logger.Panicf("BUG: unexpected invalid JSON line: %s", err) - } - - timeValue := "" - fieldsBufLen := len(fieldsBuf) - for _, f := range p.Fields { - bufLen := len(buf) - buf = append(buf, f.Name...) - name := bytesutil.ToUnsafeString(buf[bufLen:]) - - bufLen = len(buf) - buf = append(buf, f.Value...) - value := bytesutil.ToUnsafeString(buf[bufLen:]) - - fieldsBuf = append(fieldsBuf, logstorage.Field{ - Name: name, - Value: value, - }) - - if name == "_time" { - timeValue = value - } - } - rows = append(rows, fieldsBuf[fieldsBufLen:]) - times = append(times, timeValue) - } - logjson.PutParser(p) - - rs.buf = buf - rs.fieldsBuf = fieldsBuf - rs.rows = rows - rs.times = times -} - -func (rs *rowsSorter) Len() int { - return len(rs.rows) -} - -func (rs *rowsSorter) Less(i, j int) bool { - times := rs.times - return times[i] < times[j] -} - -func (rs *rowsSorter) Swap(i, j int) { - times := rs.times - rows := rs.rows - times[i], times[j] = times[j], times[i] - rows[i], rows[j] = rows[j], rows[i] -} - -func (rs *rowsSorter) sort() { - sort.Sort(rs) -} diff --git a/app/vlselect/logsql/sort_writer_test.go b/app/vlselect/logsql/sort_writer_test.go deleted file mode 100644 index 3c3325726..000000000 --- a/app/vlselect/logsql/sort_writer_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package logsql - -import ( - "bytes" - "strings" - "testing" -) - -func TestSortWriter(t *testing.T) { - f := func(maxBufLen, maxLines int, data string, expectedResult string) { - t.Helper() - - var bb bytes.Buffer - sw := getSortWriter() - sw.Init(&bb, maxBufLen, maxLines) - for _, s := range strings.Split(data, "\n") { - if !sw.TryWrite([]byte(s + "\n")) { - break - } - } - sw.FinalFlush() - putSortWriter(sw) - - result := bb.String() - if result != expectedResult { - t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult) - } - } - - f(100, 0, "", "") - f(100, 0, "{}", "{}\n") - - data := `{"_time":"def","_msg":"xxx"} -{"_time":"abc","_msg":"foo"}` - resultExpected := `{"_time":"abc","_msg":"foo"} -{"_time":"def","_msg":"xxx"} -` - f(100, 0, data, resultExpected) - f(10, 0, data, data+"\n") - - // Test with the maxLines - f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n") - f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n") - f(10, 2, data, data+"\n") - f(100, 2, data, resultExpected) -} diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md index 40f6ce03d..44291699c 100644 --- a/docs/VictoriaLogs/LogsQL.md +++ b/docs/VictoriaLogs/LogsQL.md @@ -1641,11 +1641,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying ## Sorting -By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) -if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to 1MB). -Otherwise sorting is skipped because of performance reasons. - -Use [`sort` pipe](#sort-pipe) for sorting the results. +By default VictoriaLogs doesn't sort the returned results because of performance reasons. Use [`sort` pipe](#sort-pipe) for sorting the results. ## Limiters diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index 0f92053f1..3406d5525 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -252,9 +252,6 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line The maximum duration for query execution (default 30s) -search.maxQueueDuration duration The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s) - -select.maxSortBufferSize size - Query results from /select/logsql/query are automatically sorted by _time if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; too big value for this flag may result in high memory usage since the sorting is performed in memory - Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576) -storageDataPath string Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data") -storage.minFreeDiskSpaceBytes size diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md index 77ef0004f..2755dac0a 100644 --- a/docs/VictoriaLogs/querying/README.md +++ b/docs/VictoriaLogs/querying/README.md @@ -66,10 +66,8 @@ The response can be interrupted at any time by closing the connection to Victori This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc. See [these docs](#command-line) for more details. -The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) -if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte). -Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found. -Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting) +The returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found. +Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sort-pipe) or at client side with the usual `sort` command according to [these docs](#command-line). By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy) is queried. diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go index e80a3516b..a873a1c0f 100644 --- a/lib/logstorage/block_result.go +++ b/lib/logstorage/block_result.go @@ -261,7 +261,8 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) { } // Initialize timestamps, since they are required for all the further work with br. - if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") { + so := bs.bsw.so + if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") { // The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes. rowsLen := bm.onesCount() br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen) diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go index 67faad890..b93a9eaed 100644 --- a/lib/logstorage/parser.go +++ b/lib/logstorage/parser.go @@ -206,6 +206,15 @@ func (q *Query) String() string { return s } +// AddPipeLimit adds `| limit n` pipe to q. +// +// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe +func (q *Query) AddPipeLimit(n uint64) { + q.pipes = append(q.pipes, &pipeLimit{ + n: n, + }) +} + func (q *Query) getNeededColumns() ([]string, []string) { neededFields := newFieldsSet() neededFields.add("*") diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go index 783c638e8..aee2028cf 100644 --- a/lib/logstorage/pipe_sort.go +++ b/lib/logstorage/pipe_sort.go @@ -583,7 +583,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort if ccA == ccB { continue } - return cA.c.encodedValues[0] < cB.c.encodedValues[0] + if isDesc { + return ccB < ccA + } + return ccA < ccB } if cA.c.isTime && cB.c.isTime {