VictoriaMetrics/lib/protoparser/native/stream/streamparser.go

234 lines
6.3 KiB
Go
Raw Permalink Normal View History

package stream
import (
"bufio"
"fmt"
"io"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/protoparserutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
"github.com/VictoriaMetrics/metrics"
)
// Parse parses /api/v1/import/native lines from req and calls callback for parsed blocks.
//
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold block after returning.
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 17:10:37 +00:00
func Parse(r io.Reader, contentEncoding string, callback func(block *Block) error) error {
reader, err := protoparserutil.GetUncompressedReader(r, contentEncoding)
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 17:10:37 +00:00
if err != nil {
return fmt.Errorf("cannot decode vmimport data: %w", err)
}
defer protoparserutil.PutUncompressedReader(reader)
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 17:10:37 +00:00
wcr := writeconcurrencylimiter.GetReader(reader)
defer writeconcurrencylimiter.PutReader(wcr)
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 17:10:37 +00:00
reader = wcr
app/vlinsert: follow-up for 37ed1842abec8109b8d32d42587301aca1c95619 - Properly decode protobuf-encoded Loki request if it has no Content-Encoding header. Protobuf Loki message is snappy-encoded by default, so snappy decoding must be used when Content-Encoding header is missing. - Return back the previous signatures of parseJSONRequest and parseProtobufRequest functions. This eliminates the churn in tests for these functions. This also fixes broken benchmarks BenchmarkParseJSONRequest and BenchmarkParseProtobufRequest, which consume the whole request body on the first iteration and do nothing on subsequent iterations. - Put the CHANGELOG entries into correct places, since they were incorrectly put into already released versions of VictoriaMetrics and VictoriaLogs. - Add support for reading zstd-compressed data ingestion requests into the remaining protocols at VictoriaLogs and VictoriaMetrics. - Remove the `encoding` arg from PutUncompressedReader() - it has enough information about the passed reader arg in order to properly deal with it. - Add ReadUncompressedData to lib/protoparser/common for reading uncompressed data from the reader until EOF. This allows removing repeated code across request-based protocol parsers without streaming mode. - Consistently limit data ingestion request sizes, which can be read by ReadUncompressedData function. Previously this wasn't the case for all the supported protocols. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/pull/8416 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8380 Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8300
2025-03-14 17:10:37 +00:00
br := getBufferedReader(reader)
defer putBufferedReader(br)
// Read time range (tr)
trBuf := make([]byte, 16)
var tr storage.TimeRange
if _, err := io.ReadFull(br, trBuf); err != nil {
readErrors.Inc()
return fmt.Errorf("cannot read time range: %w", err)
}
tr.MinTimestamp = encoding.UnmarshalInt64(trBuf)
tr.MaxTimestamp = encoding.UnmarshalInt64(trBuf[8:])
// Read native blocks and feed workers with work.
sizeBuf := make([]byte, 4)
ctx := &streamContext{}
for {
uw := getUnmarshalWork()
uw.tr = tr
uw.ctx = ctx
uw.callback = callback
// Read uw.metricNameBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
if err == io.EOF {
// End of stream
putUnmarshalWork(uw)
ctx.wg.Wait()
return ctx.err
}
readErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("cannot read metricName size: %w", err)
}
readCalls.Inc()
bufSize := encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("too big metricName size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.metricNameBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.metricNameBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.metricNameBuf); err != nil {
readErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("cannot read metricName with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
// Read uw.blockBuf
if _, err := io.ReadFull(br, sizeBuf); err != nil {
readErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("cannot read native block size: %w", err)
}
readCalls.Inc()
bufSize = encoding.UnmarshalUint32(sizeBuf)
if bufSize > 1024*1024 {
parseErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("too big native block size; got %d; shouldn't exceed %d", bufSize, 1024*1024)
}
uw.blockBuf = bytesutil.ResizeNoCopyMayOverallocate(uw.blockBuf, int(bufSize))
if _, err := io.ReadFull(br, uw.blockBuf); err != nil {
readErrors.Inc()
ctx.wg.Wait()
return fmt.Errorf("cannot read native block with size %d bytes: %w", bufSize, err)
}
readCalls.Inc()
blocksRead.Inc()
ctx.wg.Add(1)
protoparserutil.ScheduleUnmarshalWork(uw)
wcr.DecConcurrency()
}
}
type streamContext struct {
wg sync.WaitGroup
errLock sync.Mutex
err error
}
// Block is a single block from `/api/v1/import/native` request.
type Block struct {
MetricName storage.MetricName
Values []float64
Timestamps []int64
}
func (b *Block) reset() {
b.MetricName.Reset()
b.Values = b.Values[:0]
b.Timestamps = b.Timestamps[:0]
}
var (
readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="native"}`)
readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="native"}`)
rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="native"}`)
blocksRead = metrics.NewCounter(`vm_protoparser_blocks_read_total{type="native"}`)
parseErrors = metrics.NewCounter(`vm_protoparser_parse_errors_total{type="native"}`)
processErrors = metrics.NewCounter(`vm_protoparser_process_errors_total{type="native"}`)
)
type unmarshalWork struct {
tr storage.TimeRange
ctx *streamContext
callback func(block *Block) error
metricNameBuf []byte
blockBuf []byte
block Block
}
func (uw *unmarshalWork) reset() {
uw.ctx = nil
uw.callback = nil
uw.metricNameBuf = uw.metricNameBuf[:0]
uw.blockBuf = uw.blockBuf[:0]
uw.block.reset()
}
// Unmarshal implements protoparserutil.UnmarshalWork
func (uw *unmarshalWork) Unmarshal() {
err := uw.unmarshal()
if err != nil {
parseErrors.Inc()
} else {
err = uw.callback(&uw.block)
}
ctx := uw.ctx
if err != nil {
processErrors.Inc()
ctx.errLock.Lock()
if ctx.err == nil {
ctx.err = fmt.Errorf("error when processing native block: %w", err)
}
ctx.errLock.Unlock()
}
ctx.wg.Done()
putUnmarshalWork(uw)
}
func (uw *unmarshalWork) unmarshal() error {
block := &uw.block
if err := block.MetricName.Unmarshal(uw.metricNameBuf); err != nil {
return fmt.Errorf("cannot unmarshal metricName from %d bytes: %w", len(uw.metricNameBuf), err)
}
tmpBlock := blockPool.Get().(*storage.Block)
defer blockPool.Put(tmpBlock)
tail, err := tmpBlock.UnmarshalPortable(uw.blockBuf)
if err != nil {
return fmt.Errorf("cannot unmarshal native block from %d bytes: %w", len(uw.blockBuf), err)
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling native block from %d bytes; len(tail)=%d bytes", len(uw.blockBuf), len(tail))
}
block.Timestamps, block.Values = tmpBlock.AppendRowsWithTimeRangeFilter(block.Timestamps[:0], block.Values[:0], uw.tr)
rowsRead.Add(len(block.Timestamps))
return nil
}
var blockPool = &sync.Pool{
New: func() any {
return &storage.Block{}
},
}
func getUnmarshalWork() *unmarshalWork {
v := unmarshalWorkPool.Get()
if v == nil {
return &unmarshalWork{}
}
return v.(*unmarshalWork)
}
func putUnmarshalWork(uw *unmarshalWork) {
uw.reset()
unmarshalWorkPool.Put(uw)
}
var unmarshalWorkPool sync.Pool
func getBufferedReader(r io.Reader) *bufio.Reader {
v := bufferedReaderPool.Get()
if v == nil {
return bufio.NewReaderSize(r, 64*1024)
}
br := v.(*bufio.Reader)
br.Reset(r)
return br
}
func putBufferedReader(br *bufio.Reader) {
br.Reset(nil)
bufferedReaderPool.Put(br)
}
var bufferedReaderPool sync.Pool