From b1ee7bca1abfc1503ea3f5cc508976cef6f91cd2 Mon Sep 17 00:00:00 2001
From: Aliaksandr Valialkin <valyala@victoriametrics.com>
Date: Tue, 14 May 2024 03:05:03 +0200
Subject: [PATCH] lib/logstorage: work-in-progress

---
 app/vlselect/logsql/buffered_writer.go  |  47 ++++
 app/vlselect/logsql/logsql.go           |  28 +--
 app/vlselect/logsql/sort_writer.go      | 290 ------------------------
 app/vlselect/logsql/sort_writer_test.go |  46 ----
 docs/VictoriaLogs/LogsQL.md             |   6 +-
 docs/VictoriaLogs/README.md             |   3 -
 docs/VictoriaLogs/querying/README.md    |   6 +-
 lib/logstorage/block_result.go          |   3 +-
 lib/logstorage/parser.go                |   9 +
 lib/logstorage/pipe_sort.go             |   5 +-
 10 files changed, 74 insertions(+), 369 deletions(-)
 create mode 100644 app/vlselect/logsql/buffered_writer.go
 delete mode 100644 app/vlselect/logsql/sort_writer.go
 delete mode 100644 app/vlselect/logsql/sort_writer_test.go

diff --git a/app/vlselect/logsql/buffered_writer.go b/app/vlselect/logsql/buffered_writer.go
new file mode 100644
index 0000000000..57ad56199a
--- /dev/null
+++ b/app/vlselect/logsql/buffered_writer.go
@@ -0,0 +1,47 @@
+package logsql
+
+import (
+	"bufio"
+	"io"
+	"sync"
+)
+
+func getBufferedWriter(w io.Writer) *bufferedWriter {
+	v := bufferedWriterPool.Get()
+	if v == nil {
+		return &bufferedWriter{
+			bw: bufio.NewWriter(w),
+		}
+	}
+	bw := v.(*bufferedWriter)
+	bw.bw.Reset(w)
+	return bw
+}
+
+func putBufferedWriter(bw *bufferedWriter) {
+	bw.reset()
+	bufferedWriterPool.Put(bw)
+}
+
+var bufferedWriterPool sync.Pool
+
+type bufferedWriter struct {
+	mu sync.Mutex
+	bw *bufio.Writer
+}
+
+func (bw *bufferedWriter) reset() {
+	// nothing to do
+}
+
+func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) {
+	bw.mu.Lock()
+	_, _ = bw.bw.Write(p)
+	bw.mu.Unlock()
+}
+
+func (bw *bufferedWriter) FlushIgnoreErrors() {
+	bw.mu.Lock()
+	_ = bw.bw.Flush()
+	bw.mu.Unlock()
+}
diff --git a/app/vlselect/logsql/logsql.go b/app/vlselect/logsql/logsql.go
index 0b861ccdac..6cf2b266f0 100644
--- a/app/vlselect/logsql/logsql.go
+++ b/app/vlselect/logsql/logsql.go
@@ -6,18 +6,11 @@ import (
 
 	"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
 	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
 )
 
-var (
-	maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
-		"if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+
-		"too big value for this flag may result in high memory usage since the sorting is performed in memory")
-)
-
 // ProcessQueryRequest handles /select/logsql/query request.
 func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
 	// Extract tenantID
@@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
 	}
 	w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
 
-	sw := getSortWriter()
-	sw.Init(w, maxSortBufferSize.IntN(), limit)
+	if limit > 0 {
+		q.AddPipeLimit(uint64(limit))
+	}
+
 	tenantIDs := []logstorage.TenantID{tenantID}
 
-	ctxWithCancel, cancel := context.WithCancel(ctx)
-	defer cancel()
+	bw := getBufferedWriter(w)
 
 	writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
 		if len(columns) == 0 {
@@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
 		for i := range timestamps {
 			WriteJSONRow(bb, columns, i)
 		}
-
-		if !sw.TryWrite(bb.B) {
-			cancel()
-		}
-
+		bw.WriteIgnoreErrors(bb.B)
 		blockResultPool.Put(bb)
 	}
 
-	err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
+	err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
 
-	sw.FinalFlush()
-	putSortWriter(sw)
+	bw.FlushIgnoreErrors()
+	putBufferedWriter(bw)
 
 	if err != nil {
 		httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err)
diff --git a/app/vlselect/logsql/sort_writer.go b/app/vlselect/logsql/sort_writer.go
deleted file mode 100644
index 3ad0fbf706..0000000000
--- a/app/vlselect/logsql/sort_writer.go
+++ /dev/null
@@ -1,290 +0,0 @@
-package logsql
-
-import (
-	"bytes"
-	"io"
-	"sort"
-	"sync"
-
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
-	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
-)
-
-func getSortWriter() *sortWriter {
-	v := sortWriterPool.Get()
-	if v == nil {
-		return &sortWriter{}
-	}
-	return v.(*sortWriter)
-}
-
-func putSortWriter(sw *sortWriter) {
-	sw.reset()
-	sortWriterPool.Put(sw)
-}
-
-var sortWriterPool sync.Pool
-
-// sortWriter expects JSON line stream to be written to it.
-//
-// It buffers the incoming data until its size reaches maxBufLen.
-// Then it streams the buffered data and all the incoming data to w.
-//
-// The FinalFlush() must be called when all the data is written.
-// If the buf isn't empty at FinalFlush() call, then the buffered data
-// is sorted by _time field.
-type sortWriter struct {
-	mu sync.Mutex
-	w  io.Writer
-
-	maxLines     int
-	linesWritten int
-
-	maxBufLen  int
-	buf        []byte
-	bufFlushed bool
-
-	hasErr bool
-}
-
-func (sw *sortWriter) reset() {
-	sw.w = nil
-
-	sw.maxLines = 0
-	sw.linesWritten = 0
-
-	sw.maxBufLen = 0
-	sw.buf = sw.buf[:0]
-	sw.bufFlushed = false
-	sw.hasErr = false
-}
-
-// Init initializes sw.
-//
-// If maxLines is set to positive value, then sw accepts up to maxLines
-// and then rejects all the other lines by returning false from TryWrite.
-func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
-	sw.reset()
-
-	sw.w = w
-	sw.maxBufLen = maxBufLen
-	sw.maxLines = maxLines
-}
-
-// TryWrite writes p to sw.
-//
-// True is returned on successful write, false otherwise.
-//
-// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
-func (sw *sortWriter) TryWrite(p []byte) bool {
-	sw.mu.Lock()
-	defer sw.mu.Unlock()
-
-	if sw.hasErr {
-		return false
-	}
-
-	if sw.bufFlushed {
-		if !sw.writeToUnderlyingWriterLocked(p) {
-			sw.hasErr = true
-			return false
-		}
-		return true
-	}
-
-	if len(sw.buf)+len(p) < sw.maxBufLen {
-		sw.buf = append(sw.buf, p...)
-		return true
-	}
-
-	sw.bufFlushed = true
-	if !sw.writeToUnderlyingWriterLocked(sw.buf) {
-		sw.hasErr = true
-		return false
-	}
-	sw.buf = sw.buf[:0]
-
-	if !sw.writeToUnderlyingWriterLocked(p) {
-		sw.hasErr = true
-		return false
-	}
-	return true
-}
-
-func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
-	if len(p) == 0 {
-		return true
-	}
-	if sw.maxLines > 0 {
-		if sw.linesWritten >= sw.maxLines {
-			return false
-		}
-		var linesLeft int
-		p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
-		sw.linesWritten += linesLeft
-	}
-	if _, err := sw.w.Write(p); err != nil {
-		return false
-	}
-	return true
-}
-
-func trimLines(p []byte, maxLines int) ([]byte, int) {
-	if maxLines <= 0 {
-		return nil, 0
-	}
-	n := bytes.Count(p, newline)
-	if n < maxLines {
-		return p, n
-	}
-	for n >= maxLines {
-		idx := bytes.LastIndexByte(p, '\n')
-		p = p[:idx]
-		n--
-	}
-	return p[:len(p)+1], maxLines
-}
-
-var newline = []byte("\n")
-
-func (sw *sortWriter) FinalFlush() {
-	if sw.hasErr || sw.bufFlushed {
-		return
-	}
-
-	rs := getRowsSorter()
-	rs.parseRows(sw.buf)
-	rs.sort()
-
-	rows := rs.rows
-	if sw.maxLines > 0 && len(rows) > sw.maxLines {
-		rows = rows[:sw.maxLines]
-	}
-	WriteJSONRows(sw.w, rows)
-
-	putRowsSorter(rs)
-}
-
-func getRowsSorter() *rowsSorter {
-	v := rowsSorterPool.Get()
-	if v == nil {
-		return &rowsSorter{}
-	}
-	return v.(*rowsSorter)
-}
-
-func putRowsSorter(rs *rowsSorter) {
-	rs.reset()
-	rowsSorterPool.Put(rs)
-}
-
-var rowsSorterPool sync.Pool
-
-type rowsSorter struct {
-	buf       []byte
-	fieldsBuf []logstorage.Field
-	rows      [][]logstorage.Field
-	times     []string
-}
-
-func (rs *rowsSorter) reset() {
-	rs.buf = rs.buf[:0]
-
-	fieldsBuf := rs.fieldsBuf
-	for i := range fieldsBuf {
-		fieldsBuf[i].Reset()
-	}
-	rs.fieldsBuf = fieldsBuf[:0]
-
-	rows := rs.rows
-	for i := range rows {
-		rows[i] = nil
-	}
-	rs.rows = rows[:0]
-
-	times := rs.times
-	for i := range times {
-		times[i] = ""
-	}
-	rs.times = times[:0]
-}
-
-func (rs *rowsSorter) parseRows(src []byte) {
-	rs.reset()
-
-	buf := rs.buf
-	fieldsBuf := rs.fieldsBuf
-	rows := rs.rows
-	times := rs.times
-
-	p := logjson.GetParser()
-	for len(src) > 0 {
-		var line []byte
-		n := bytes.IndexByte(src, '\n')
-		if n < 0 {
-			line = src
-			src = nil
-		} else {
-			line = src[:n]
-			src = src[n+1:]
-		}
-		if len(line) == 0 {
-			continue
-		}
-
-		if err := p.ParseLogMessage(line); err != nil {
-			logger.Panicf("BUG: unexpected invalid JSON line: %s", err)
-		}
-
-		timeValue := ""
-		fieldsBufLen := len(fieldsBuf)
-		for _, f := range p.Fields {
-			bufLen := len(buf)
-			buf = append(buf, f.Name...)
-			name := bytesutil.ToUnsafeString(buf[bufLen:])
-
-			bufLen = len(buf)
-			buf = append(buf, f.Value...)
-			value := bytesutil.ToUnsafeString(buf[bufLen:])
-
-			fieldsBuf = append(fieldsBuf, logstorage.Field{
-				Name:  name,
-				Value: value,
-			})
-
-			if name == "_time" {
-				timeValue = value
-			}
-		}
-		rows = append(rows, fieldsBuf[fieldsBufLen:])
-		times = append(times, timeValue)
-	}
-	logjson.PutParser(p)
-
-	rs.buf = buf
-	rs.fieldsBuf = fieldsBuf
-	rs.rows = rows
-	rs.times = times
-}
-
-func (rs *rowsSorter) Len() int {
-	return len(rs.rows)
-}
-
-func (rs *rowsSorter) Less(i, j int) bool {
-	times := rs.times
-	return times[i] < times[j]
-}
-
-func (rs *rowsSorter) Swap(i, j int) {
-	times := rs.times
-	rows := rs.rows
-	times[i], times[j] = times[j], times[i]
-	rows[i], rows[j] = rows[j], rows[i]
-}
-
-func (rs *rowsSorter) sort() {
-	sort.Sort(rs)
-}
diff --git a/app/vlselect/logsql/sort_writer_test.go b/app/vlselect/logsql/sort_writer_test.go
deleted file mode 100644
index 3c33257267..0000000000
--- a/app/vlselect/logsql/sort_writer_test.go
+++ /dev/null
@@ -1,46 +0,0 @@
-package logsql
-
-import (
-	"bytes"
-	"strings"
-	"testing"
-)
-
-func TestSortWriter(t *testing.T) {
-	f := func(maxBufLen, maxLines int, data string, expectedResult string) {
-		t.Helper()
-
-		var bb bytes.Buffer
-		sw := getSortWriter()
-		sw.Init(&bb, maxBufLen, maxLines)
-		for _, s := range strings.Split(data, "\n") {
-			if !sw.TryWrite([]byte(s + "\n")) {
-				break
-			}
-		}
-		sw.FinalFlush()
-		putSortWriter(sw)
-
-		result := bb.String()
-		if result != expectedResult {
-			t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
-		}
-	}
-
-	f(100, 0, "", "")
-	f(100, 0, "{}", "{}\n")
-
-	data := `{"_time":"def","_msg":"xxx"}
-{"_time":"abc","_msg":"foo"}`
-	resultExpected := `{"_time":"abc","_msg":"foo"}
-{"_time":"def","_msg":"xxx"}
-`
-	f(100, 0, data, resultExpected)
-	f(10, 0, data, data+"\n")
-
-	// Test with the maxLines
-	f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n")
-	f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n")
-	f(10, 2, data, data+"\n")
-	f(100, 2, data, resultExpected)
-}
diff --git a/docs/VictoriaLogs/LogsQL.md b/docs/VictoriaLogs/LogsQL.md
index 40f6ce03df..44291699c0 100644
--- a/docs/VictoriaLogs/LogsQL.md
+++ b/docs/VictoriaLogs/LogsQL.md
@@ -1641,11 +1641,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying
 
 ## Sorting
 
-By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
-if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to 1MB).
-Otherwise sorting is skipped because of performance reasons.
-
-Use [`sort` pipe](#sort-pipe) for sorting the results.
+By default VictoriaLogs doesn't sort the returned results because of performance reasons. Use [`sort` pipe](#sort-pipe) for sorting the results.
 
 ## Limiters
 
diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md
index 0f92053f12..3406d55253 100644
--- a/docs/VictoriaLogs/README.md
+++ b/docs/VictoriaLogs/README.md
@@ -252,9 +252,6 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
     	The maximum duration for query execution (default 30s)
   -search.maxQueueDuration duration
     	The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s)
-  -select.maxSortBufferSize size
-    	Query results from /select/logsql/query are automatically sorted by _time if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; too big value for this flag may result in high memory usage since the sorting is performed in memory
-    	Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576)
   -storageDataPath string
     	Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data")
   -storage.minFreeDiskSpaceBytes size
diff --git a/docs/VictoriaLogs/querying/README.md b/docs/VictoriaLogs/querying/README.md
index 77ef0004fb..2755dac0a1 100644
--- a/docs/VictoriaLogs/querying/README.md
+++ b/docs/VictoriaLogs/querying/README.md
@@ -66,10 +66,8 @@ The response can be interrupted at any time by closing the connection to Victori
 This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
 See [these docs](#command-line) for more details.
 
-The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
-if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte).
-Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
-Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
+The returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
+Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sort-pipe)
 or at client side with the usual `sort` command according to [these docs](#command-line).
 
 By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy) is queried.
diff --git a/lib/logstorage/block_result.go b/lib/logstorage/block_result.go
index e80a3516b9..a873a1c0ff 100644
--- a/lib/logstorage/block_result.go
+++ b/lib/logstorage/block_result.go
@@ -261,7 +261,8 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
 	}
 
 	// Initialize timestamps, since they are required for all the further work with br.
-	if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") {
+	so := bs.bsw.so
+	if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") {
 		// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
 		rowsLen := bm.onesCount()
 		br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)
diff --git a/lib/logstorage/parser.go b/lib/logstorage/parser.go
index 67faad8906..b93a9eaeda 100644
--- a/lib/logstorage/parser.go
+++ b/lib/logstorage/parser.go
@@ -206,6 +206,15 @@ func (q *Query) String() string {
 	return s
 }
 
+// AddPipeLimit adds `| limit n` pipe to q.
+//
+// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
+func (q *Query) AddPipeLimit(n uint64) {
+	q.pipes = append(q.pipes, &pipeLimit{
+		n: n,
+	})
+}
+
 func (q *Query) getNeededColumns() ([]string, []string) {
 	neededFields := newFieldsSet()
 	neededFields.add("*")
diff --git a/lib/logstorage/pipe_sort.go b/lib/logstorage/pipe_sort.go
index 783c638e81..aee2028cfb 100644
--- a/lib/logstorage/pipe_sort.go
+++ b/lib/logstorage/pipe_sort.go
@@ -583,7 +583,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
 			if ccA == ccB {
 				continue
 			}
-			return cA.c.encodedValues[0] < cB.c.encodedValues[0]
+			if isDesc {
+				return ccB < ccA
+			}
+			return ccA < ccB
 		}
 
 		if cA.c.isTime && cB.c.isTime {