mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: work-in-progress
This commit is contained in:
parent
cb35e62e04
commit
da3af090c6
10 changed files with 74 additions and 369 deletions
47
app/vlselect/logsql/buffered_writer.go
Normal file
47
app/vlselect/logsql/buffered_writer.go
Normal file
|
@ -0,0 +1,47 @@
|
|||
package logsql
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
|
||||
func getBufferedWriter(w io.Writer) *bufferedWriter {
|
||||
v := bufferedWriterPool.Get()
|
||||
if v == nil {
|
||||
return &bufferedWriter{
|
||||
bw: bufio.NewWriter(w),
|
||||
}
|
||||
}
|
||||
bw := v.(*bufferedWriter)
|
||||
bw.bw.Reset(w)
|
||||
return bw
|
||||
}
|
||||
|
||||
func putBufferedWriter(bw *bufferedWriter) {
|
||||
bw.reset()
|
||||
bufferedWriterPool.Put(bw)
|
||||
}
|
||||
|
||||
var bufferedWriterPool sync.Pool
|
||||
|
||||
type bufferedWriter struct {
|
||||
mu sync.Mutex
|
||||
bw *bufio.Writer
|
||||
}
|
||||
|
||||
func (bw *bufferedWriter) reset() {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
func (bw *bufferedWriter) WriteIgnoreErrors(p []byte) {
|
||||
bw.mu.Lock()
|
||||
_, _ = bw.bw.Write(p)
|
||||
bw.mu.Unlock()
|
||||
}
|
||||
|
||||
func (bw *bufferedWriter) FlushIgnoreErrors() {
|
||||
bw.mu.Lock()
|
||||
_ = bw.bw.Flush()
|
||||
bw.mu.Unlock()
|
||||
}
|
|
@ -6,18 +6,11 @@ import (
|
|||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vlstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
)
|
||||
|
||||
var (
|
||||
maxSortBufferSize = flagutil.NewBytes("select.maxSortBufferSize", 1024*1024, "Query results from /select/logsql/query are automatically sorted by _time "+
|
||||
"if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; "+
|
||||
"too big value for this flag may result in high memory usage since the sorting is performed in memory")
|
||||
)
|
||||
|
||||
// ProcessQueryRequest handles /select/logsql/query request.
|
||||
func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
|
||||
// Extract tenantID
|
||||
|
@ -40,12 +33,13 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
|||
}
|
||||
w.Header().Set("Content-Type", "application/stream+json; charset=utf-8")
|
||||
|
||||
sw := getSortWriter()
|
||||
sw.Init(w, maxSortBufferSize.IntN(), limit)
|
||||
if limit > 0 {
|
||||
q.AddPipeLimit(uint64(limit))
|
||||
}
|
||||
|
||||
tenantIDs := []logstorage.TenantID{tenantID}
|
||||
|
||||
ctxWithCancel, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
bw := getBufferedWriter(w)
|
||||
|
||||
writeBlock := func(_ uint, timestamps []int64, columns []logstorage.BlockColumn) {
|
||||
if len(columns) == 0 {
|
||||
|
@ -56,18 +50,14 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
|
|||
for i := range timestamps {
|
||||
WriteJSONRow(bb, columns, i)
|
||||
}
|
||||
|
||||
if !sw.TryWrite(bb.B) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
bw.WriteIgnoreErrors(bb.B)
|
||||
blockResultPool.Put(bb)
|
||||
}
|
||||
|
||||
err = vlstorage.RunQuery(ctxWithCancel, tenantIDs, q, writeBlock)
|
||||
err = vlstorage.RunQuery(ctx, tenantIDs, q, writeBlock)
|
||||
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
bw.FlushIgnoreErrors()
|
||||
putBufferedWriter(bw)
|
||||
|
||||
if err != nil {
|
||||
httpserver.Errorf(w, r, "cannot execute query [%s]: %s", qStr, err)
|
||||
|
|
|
@ -1,290 +0,0 @@
|
|||
package logsql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logjson"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||
)
|
||||
|
||||
func getSortWriter() *sortWriter {
|
||||
v := sortWriterPool.Get()
|
||||
if v == nil {
|
||||
return &sortWriter{}
|
||||
}
|
||||
return v.(*sortWriter)
|
||||
}
|
||||
|
||||
func putSortWriter(sw *sortWriter) {
|
||||
sw.reset()
|
||||
sortWriterPool.Put(sw)
|
||||
}
|
||||
|
||||
var sortWriterPool sync.Pool
|
||||
|
||||
// sortWriter expects JSON line stream to be written to it.
|
||||
//
|
||||
// It buffers the incoming data until its size reaches maxBufLen.
|
||||
// Then it streams the buffered data and all the incoming data to w.
|
||||
//
|
||||
// The FinalFlush() must be called when all the data is written.
|
||||
// If the buf isn't empty at FinalFlush() call, then the buffered data
|
||||
// is sorted by _time field.
|
||||
type sortWriter struct {
|
||||
mu sync.Mutex
|
||||
w io.Writer
|
||||
|
||||
maxLines int
|
||||
linesWritten int
|
||||
|
||||
maxBufLen int
|
||||
buf []byte
|
||||
bufFlushed bool
|
||||
|
||||
hasErr bool
|
||||
}
|
||||
|
||||
func (sw *sortWriter) reset() {
|
||||
sw.w = nil
|
||||
|
||||
sw.maxLines = 0
|
||||
sw.linesWritten = 0
|
||||
|
||||
sw.maxBufLen = 0
|
||||
sw.buf = sw.buf[:0]
|
||||
sw.bufFlushed = false
|
||||
sw.hasErr = false
|
||||
}
|
||||
|
||||
// Init initializes sw.
|
||||
//
|
||||
// If maxLines is set to positive value, then sw accepts up to maxLines
|
||||
// and then rejects all the other lines by returning false from TryWrite.
|
||||
func (sw *sortWriter) Init(w io.Writer, maxBufLen, maxLines int) {
|
||||
sw.reset()
|
||||
|
||||
sw.w = w
|
||||
sw.maxBufLen = maxBufLen
|
||||
sw.maxLines = maxLines
|
||||
}
|
||||
|
||||
// TryWrite writes p to sw.
|
||||
//
|
||||
// True is returned on successful write, false otherwise.
|
||||
//
|
||||
// Unsuccessful write may occur on underlying write error or when maxLines lines are already written to sw.
|
||||
func (sw *sortWriter) TryWrite(p []byte) bool {
|
||||
sw.mu.Lock()
|
||||
defer sw.mu.Unlock()
|
||||
|
||||
if sw.hasErr {
|
||||
return false
|
||||
}
|
||||
|
||||
if sw.bufFlushed {
|
||||
if !sw.writeToUnderlyingWriterLocked(p) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
if len(sw.buf)+len(p) < sw.maxBufLen {
|
||||
sw.buf = append(sw.buf, p...)
|
||||
return true
|
||||
}
|
||||
|
||||
sw.bufFlushed = true
|
||||
if !sw.writeToUnderlyingWriterLocked(sw.buf) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
sw.buf = sw.buf[:0]
|
||||
|
||||
if !sw.writeToUnderlyingWriterLocked(p) {
|
||||
sw.hasErr = true
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (sw *sortWriter) writeToUnderlyingWriterLocked(p []byte) bool {
|
||||
if len(p) == 0 {
|
||||
return true
|
||||
}
|
||||
if sw.maxLines > 0 {
|
||||
if sw.linesWritten >= sw.maxLines {
|
||||
return false
|
||||
}
|
||||
var linesLeft int
|
||||
p, linesLeft = trimLines(p, sw.maxLines-sw.linesWritten)
|
||||
sw.linesWritten += linesLeft
|
||||
}
|
||||
if _, err := sw.w.Write(p); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func trimLines(p []byte, maxLines int) ([]byte, int) {
|
||||
if maxLines <= 0 {
|
||||
return nil, 0
|
||||
}
|
||||
n := bytes.Count(p, newline)
|
||||
if n < maxLines {
|
||||
return p, n
|
||||
}
|
||||
for n >= maxLines {
|
||||
idx := bytes.LastIndexByte(p, '\n')
|
||||
p = p[:idx]
|
||||
n--
|
||||
}
|
||||
return p[:len(p)+1], maxLines
|
||||
}
|
||||
|
||||
var newline = []byte("\n")
|
||||
|
||||
func (sw *sortWriter) FinalFlush() {
|
||||
if sw.hasErr || sw.bufFlushed {
|
||||
return
|
||||
}
|
||||
|
||||
rs := getRowsSorter()
|
||||
rs.parseRows(sw.buf)
|
||||
rs.sort()
|
||||
|
||||
rows := rs.rows
|
||||
if sw.maxLines > 0 && len(rows) > sw.maxLines {
|
||||
rows = rows[:sw.maxLines]
|
||||
}
|
||||
WriteJSONRows(sw.w, rows)
|
||||
|
||||
putRowsSorter(rs)
|
||||
}
|
||||
|
||||
func getRowsSorter() *rowsSorter {
|
||||
v := rowsSorterPool.Get()
|
||||
if v == nil {
|
||||
return &rowsSorter{}
|
||||
}
|
||||
return v.(*rowsSorter)
|
||||
}
|
||||
|
||||
func putRowsSorter(rs *rowsSorter) {
|
||||
rs.reset()
|
||||
rowsSorterPool.Put(rs)
|
||||
}
|
||||
|
||||
var rowsSorterPool sync.Pool
|
||||
|
||||
type rowsSorter struct {
|
||||
buf []byte
|
||||
fieldsBuf []logstorage.Field
|
||||
rows [][]logstorage.Field
|
||||
times []string
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) reset() {
|
||||
rs.buf = rs.buf[:0]
|
||||
|
||||
fieldsBuf := rs.fieldsBuf
|
||||
for i := range fieldsBuf {
|
||||
fieldsBuf[i].Reset()
|
||||
}
|
||||
rs.fieldsBuf = fieldsBuf[:0]
|
||||
|
||||
rows := rs.rows
|
||||
for i := range rows {
|
||||
rows[i] = nil
|
||||
}
|
||||
rs.rows = rows[:0]
|
||||
|
||||
times := rs.times
|
||||
for i := range times {
|
||||
times[i] = ""
|
||||
}
|
||||
rs.times = times[:0]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) parseRows(src []byte) {
|
||||
rs.reset()
|
||||
|
||||
buf := rs.buf
|
||||
fieldsBuf := rs.fieldsBuf
|
||||
rows := rs.rows
|
||||
times := rs.times
|
||||
|
||||
p := logjson.GetParser()
|
||||
for len(src) > 0 {
|
||||
var line []byte
|
||||
n := bytes.IndexByte(src, '\n')
|
||||
if n < 0 {
|
||||
line = src
|
||||
src = nil
|
||||
} else {
|
||||
line = src[:n]
|
||||
src = src[n+1:]
|
||||
}
|
||||
if len(line) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if err := p.ParseLogMessage(line); err != nil {
|
||||
logger.Panicf("BUG: unexpected invalid JSON line: %s", err)
|
||||
}
|
||||
|
||||
timeValue := ""
|
||||
fieldsBufLen := len(fieldsBuf)
|
||||
for _, f := range p.Fields {
|
||||
bufLen := len(buf)
|
||||
buf = append(buf, f.Name...)
|
||||
name := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
bufLen = len(buf)
|
||||
buf = append(buf, f.Value...)
|
||||
value := bytesutil.ToUnsafeString(buf[bufLen:])
|
||||
|
||||
fieldsBuf = append(fieldsBuf, logstorage.Field{
|
||||
Name: name,
|
||||
Value: value,
|
||||
})
|
||||
|
||||
if name == "_time" {
|
||||
timeValue = value
|
||||
}
|
||||
}
|
||||
rows = append(rows, fieldsBuf[fieldsBufLen:])
|
||||
times = append(times, timeValue)
|
||||
}
|
||||
logjson.PutParser(p)
|
||||
|
||||
rs.buf = buf
|
||||
rs.fieldsBuf = fieldsBuf
|
||||
rs.rows = rows
|
||||
rs.times = times
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Len() int {
|
||||
return len(rs.rows)
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Less(i, j int) bool {
|
||||
times := rs.times
|
||||
return times[i] < times[j]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) Swap(i, j int) {
|
||||
times := rs.times
|
||||
rows := rs.rows
|
||||
times[i], times[j] = times[j], times[i]
|
||||
rows[i], rows[j] = rows[j], rows[i]
|
||||
}
|
||||
|
||||
func (rs *rowsSorter) sort() {
|
||||
sort.Sort(rs)
|
||||
}
|
|
@ -1,46 +0,0 @@
|
|||
package logsql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSortWriter(t *testing.T) {
|
||||
f := func(maxBufLen, maxLines int, data string, expectedResult string) {
|
||||
t.Helper()
|
||||
|
||||
var bb bytes.Buffer
|
||||
sw := getSortWriter()
|
||||
sw.Init(&bb, maxBufLen, maxLines)
|
||||
for _, s := range strings.Split(data, "\n") {
|
||||
if !sw.TryWrite([]byte(s + "\n")) {
|
||||
break
|
||||
}
|
||||
}
|
||||
sw.FinalFlush()
|
||||
putSortWriter(sw)
|
||||
|
||||
result := bb.String()
|
||||
if result != expectedResult {
|
||||
t.Fatalf("unexpected result;\ngot\n%s\nwant\n%s", result, expectedResult)
|
||||
}
|
||||
}
|
||||
|
||||
f(100, 0, "", "")
|
||||
f(100, 0, "{}", "{}\n")
|
||||
|
||||
data := `{"_time":"def","_msg":"xxx"}
|
||||
{"_time":"abc","_msg":"foo"}`
|
||||
resultExpected := `{"_time":"abc","_msg":"foo"}
|
||||
{"_time":"def","_msg":"xxx"}
|
||||
`
|
||||
f(100, 0, data, resultExpected)
|
||||
f(10, 0, data, data+"\n")
|
||||
|
||||
// Test with the maxLines
|
||||
f(100, 1, data, `{"_time":"abc","_msg":"foo"}`+"\n")
|
||||
f(10, 1, data, `{"_time":"def","_msg":"xxx"}`+"\n")
|
||||
f(10, 2, data, data+"\n")
|
||||
f(100, 2, data, resultExpected)
|
||||
}
|
|
@ -1641,11 +1641,7 @@ according to [these docs](https://docs.victoriametrics.com/VictoriaLogs/querying
|
|||
|
||||
## Sorting
|
||||
|
||||
By default VictoriaLogs sorts the returned results by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||
if their total size doesn't exceed `-select.maxSortBufferSize` command-line value (by default it is set to 1MB).
|
||||
Otherwise sorting is skipped because of performance reasons.
|
||||
|
||||
Use [`sort` pipe](#sort-pipe) for sorting the results.
|
||||
By default VictoriaLogs doesn't sort the returned results because of performance reasons. Use [`sort` pipe](#sort-pipe) for sorting the results.
|
||||
|
||||
## Limiters
|
||||
|
||||
|
|
|
@ -252,9 +252,6 @@ Pass `-help` to VictoriaLogs in order to see the list of supported command-line
|
|||
The maximum duration for query execution (default 30s)
|
||||
-search.maxQueueDuration duration
|
||||
The maximum time the search request waits for execution when -search.maxConcurrentRequests limit is reached; see also -search.maxQueryDuration (default 10s)
|
||||
-select.maxSortBufferSize size
|
||||
Query results from /select/logsql/query are automatically sorted by _time if their summary size doesn't exceed this value; otherwise, query results are streamed in the response without sorting; too big value for this flag may result in high memory usage since the sorting is performed in memory
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 1048576)
|
||||
-storageDataPath string
|
||||
Path to directory with the VictoriaLogs data; see https://docs.victoriametrics.com/VictoriaLogs/#storage (default "victoria-logs-data")
|
||||
-storage.minFreeDiskSpaceBytes size
|
||||
|
|
|
@ -66,10 +66,8 @@ The response can be interrupted at any time by closing the connection to Victori
|
|||
This allows post-processing the returned lines at the client side with the usual Unix commands such as `grep`, `jq`, `less`, `head`, etc.
|
||||
See [these docs](#command-line) for more details.
|
||||
|
||||
The returned lines are sorted by [`_time` field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field)
|
||||
if their total size doesn't exceed `-select.maxSortBufferSize` command-line flag value (by default it is set to one megabyte).
|
||||
Otherwise the returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sorting)
|
||||
The returned lines aren't sorted, since sorting disables the ability to send matching log entries to response stream as soon as they are found.
|
||||
Query results can be sorted either at VictoriaLogs side according [to these docs](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html#sort-pipe)
|
||||
or at client side with the usual `sort` command according to [these docs](#command-line).
|
||||
|
||||
By default the `(AccountID=0, ProjectID=0)` [tenant](https://docs.victoriametrics.com/VictoriaLogs/#multitenancy) is queried.
|
||||
|
|
|
@ -261,7 +261,8 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
|
|||
}
|
||||
|
||||
// Initialize timestamps, since they are required for all the further work with br.
|
||||
if !slices.Contains(bs.bsw.so.neededColumnNames, "_time") || slices.Contains(bs.bsw.so.unneededColumnNames, "_time") {
|
||||
so := bs.bsw.so
|
||||
if !so.needAllColumns && !slices.Contains(so.neededColumnNames, "_time") || so.needAllColumns && slices.Contains(so.unneededColumnNames, "_time") {
|
||||
// The fastest path - _time column wasn't requested, so it is enough to initialize br.timestamps with zeroes.
|
||||
rowsLen := bm.onesCount()
|
||||
br.timestamps = fastnum.AppendInt64Zeros(br.timestamps[:0], rowsLen)
|
||||
|
|
|
@ -206,6 +206,15 @@ func (q *Query) String() string {
|
|||
return s
|
||||
}
|
||||
|
||||
// AddPipeLimit adds `| limit n` pipe to q.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#limit-pipe
|
||||
func (q *Query) AddPipeLimit(n uint64) {
|
||||
q.pipes = append(q.pipes, &pipeLimit{
|
||||
n: n,
|
||||
})
|
||||
}
|
||||
|
||||
func (q *Query) getNeededColumns() ([]string, []string) {
|
||||
neededFields := newFieldsSet()
|
||||
neededFields.add("*")
|
||||
|
|
|
@ -583,7 +583,10 @@ func sortBlockLess(shardA *pipeSortProcessorShard, rowIdxA int, shardB *pipeSort
|
|||
if ccA == ccB {
|
||||
continue
|
||||
}
|
||||
return cA.c.encodedValues[0] < cB.c.encodedValues[0]
|
||||
if isDesc {
|
||||
return ccB < ccA
|
||||
}
|
||||
return ccA < ccB
|
||||
}
|
||||
|
||||
if cA.c.isTime && cB.c.isTime {
|
||||
|
|
Loading…
Reference in a new issue