app/vmselect: improve performance scalability on multi-CPU systems for /api/v1/export/... endpoints

This commit is contained in:
Aliaksandr Valialkin 2022-10-01 22:05:43 +03:00
parent 5497997b72
commit 43bdd96a6e
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
7 changed files with 245 additions and 282 deletions

View file

@ -112,7 +112,7 @@ func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *stor
bi.workCh = make(chan workItem, 16) bi.workCh = make(chan workItem, 16)
bi.wg.Add(1) bi.wg.Add(1)
go func() { go func() {
_, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerIdx int) error { _, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerID uint) error {
wi := workItem{ wi := workItem{
mb: mb, mb: mb,
doneCh: make(chan struct{}), doneCh: make(chan struct{}),

View file

@ -662,9 +662,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli
} }
// Push mrs to storage nodes in parallel. // Push mrs to storage nodes in parallel.
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.registerMetricNamesRequests.Inc() sn.registerMetricNamesRequests.Inc()
err := sn.registerMetricNames(qt, mrsPerNode[workerIdx], deadline) err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline)
if err != nil { if err != nil {
sn.registerMetricNamesErrors.Inc() sn.registerMetricNamesErrors.Inc()
} }
@ -693,7 +693,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
deletedCount int deletedCount int
err error err error
} }
snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.deleteSeriesRequests.Inc() sn.deleteSeriesRequests.Inc()
deletedCount, err := sn.deleteSeries(qt, requestData, deadline) deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
if err != nil { if err != nil {
@ -734,7 +734,7 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
labelNames []string labelNames []string
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.labelNamesRequests.Inc() sn.labelNamesRequests.Inc()
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
if err != nil { if err != nil {
@ -836,7 +836,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
labelValues []string labelValues []string
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.labelValuesRequests.Inc() sn.labelValuesRequests.Inc()
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
if err != nil { if err != nil {
@ -918,7 +918,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP
suffixes []string suffixes []string
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.tagValueSuffixesRequests.Inc() sn.tagValueSuffixesRequests.Inc()
suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline)
if err != nil { if err != nil {
@ -982,7 +982,7 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
status *storage.TSDBStatus status *storage.TSDBStatus
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.tsdbStatusRequests.Inc() sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
if err != nil { if err != nil {
@ -1087,7 +1087,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia
n uint64 n uint64
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.seriesCountRequests.Inc() sn.seriesCountRequests.Inc()
n, err := sn.getSeriesCount(qt, accountID, projectID, deadline) n, err := sn.getSeriesCount(qt, accountID, projectID, deadline)
if err != nil { if err != nil {
@ -1139,16 +1139,16 @@ func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper {
} }
} }
func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error { func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error {
bb := tmpBufPool.Get() bb := tmpBufPool.Get()
bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block)
addr, err := tbfw.tbfs[workerIdx].WriteBlockData(bb.B, workerIdx) addr, err := tbfw.tbfs[workerID].WriteBlockData(bb.B, workerID)
tmpBufPool.Put(bb) tmpBufPool.Put(bb)
if err != nil { if err != nil {
return err return err
} }
metricName := mb.MetricName metricName := mb.MetricName
m := tbfw.ms[workerIdx] m := tbfw.ms[workerID]
addrs := m[string(metricName)] addrs := m[string(metricName)]
addrs = append(addrs, addr) addrs = append(addrs, addr)
if len(addrs) > 1 { if len(addrs) > 1 {
@ -1156,11 +1156,11 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
} else { } else {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr // An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNamess and tbfw.ms. // in both tbfw.orderedMetricNamess and tbfw.ms.
orderedMetricNames := tbfw.orderedMetricNamess[workerIdx] orderedMetricNames := tbfw.orderedMetricNamess[workerID]
orderedMetricNames = append(orderedMetricNames, string(metricName)) orderedMetricNames = append(orderedMetricNames, string(metricName))
metricNameStr := orderedMetricNames[len(orderedMetricNames)-1] metricNameStr := orderedMetricNames[len(orderedMetricNames)-1]
m[metricNameStr] = addrs m[metricNameStr] = addrs
tbfw.orderedMetricNamess[workerIdx] = orderedMetricNames tbfw.orderedMetricNamess[workerID] = orderedMetricNames
} }
return nil return nil
} }
@ -1200,7 +1200,7 @@ var metricNamePool = &sync.Pool{
// It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block.
// It is the responsibility of f to filter blocks according to the given tr. // It is the responsibility of f to filter blocks according to the given tr.
func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline,
f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error {
qt = qt.NewChild("export blocks: %s", sq) qt = qt.NewChild("export blocks: %s", sq)
defer qt.Done() defer qt.Done()
if deadline.Exceeded() { if deadline.Exceeded() {
@ -1212,18 +1212,18 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
} }
blocksRead := newPerNodeCounter() blocksRead := newPerNodeCounter()
samples := newPerNodeCounter() samples := newPerNodeCounter()
processBlock := func(mb *storage.MetricBlock, workerIdx int) error { processBlock := func(mb *storage.MetricBlock, workerID uint) error {
mn := metricNamePool.Get().(*storage.MetricName) mn := metricNamePool.Get().(*storage.MetricName)
if err := mn.Unmarshal(mb.MetricName); err != nil { if err := mn.Unmarshal(mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %w", err) return fmt.Errorf("cannot unmarshal metricName: %w", err)
} }
if err := f(mn, &mb.Block, tr); err != nil { if err := f(mn, &mb.Block, tr, workerID); err != nil {
return err return err
} }
mn.Reset() mn.Reset()
metricNamePool.Put(mn) metricNamePool.Put(mn)
blocksRead.Add(workerIdx, 1) blocksRead.Add(workerID, 1)
samples.Add(workerIdx, uint64(mb.Block.RowsCount())) samples.Add(workerID, uint64(mb.Block.RowsCount()))
return nil return nil
} }
_, err := ProcessBlocks(qt, true, sq, processBlock, deadline) _, err := ProcessBlocks(qt, true, sq, processBlock, deadline)
@ -1250,7 +1250,7 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
metricNames []string metricNames []string
err error err error
} }
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.searchMetricNamesRequests.Inc() sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
if err != nil { if err != nil {
@ -1307,15 +1307,15 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
blocksRead := newPerNodeCounter() blocksRead := newPerNodeCounter()
samples := newPerNodeCounter() samples := newPerNodeCounter()
maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes)) maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes))
processBlock := func(mb *storage.MetricBlock, workerIdx int) error { processBlock := func(mb *storage.MetricBlock, workerID uint) error {
blocksRead.Add(workerIdx, 1) blocksRead.Add(workerID, 1)
n := samples.Add(workerIdx, uint64(mb.Block.RowsCount())) n := samples.Add(workerID, uint64(mb.Block.RowsCount()))
if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) { if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) {
return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+ return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+
"to increase the -search.maxSamplesPerQuery; to reduce time range for the query; "+ "to increase the -search.maxSamplesPerQuery; to reduce time range for the query; "+
"to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery) "to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery)
} }
if err := tbfw.RegisterAndWriteBlock(mb, workerIdx); err != nil { if err := tbfw.RegisterAndWriteBlock(mb, workerID); err != nil {
return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err) return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err)
} }
return nil return nil
@ -1348,7 +1348,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
// ProcessBlocks calls processBlock per each block matching the given sq. // ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock, workerIdx int) error, deadline searchutils.Deadline) (bool, error) { processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) {
requestData := sq.Marshal(nil) requestData := sq.Marshal(nil)
// Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function.
@ -1371,8 +1371,8 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
_ [128 - unsafe.Sizeof(wgStruct{})%128]byte _ [128 - unsafe.Sizeof(wgStruct{})%128]byte
} }
wgs := make([]wgWithPadding, len(storageNodes)) wgs := make([]wgWithPadding, len(storageNodes))
f := func(mb *storage.MetricBlock, workerIdx int) error { f := func(mb *storage.MetricBlock, workerID uint) error {
muwg := &wgs[workerIdx] muwg := &wgs[workerID]
muwg.mu.Lock() muwg.mu.Lock()
if muwg.stop { if muwg.stop {
muwg.mu.Unlock() muwg.mu.Unlock()
@ -1380,15 +1380,15 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage
} }
muwg.wg.Add(1) muwg.wg.Add(1)
muwg.mu.Unlock() muwg.mu.Unlock()
err := processBlock(mb, workerIdx) err := processBlock(mb, workerID)
muwg.wg.Done() muwg.wg.Done()
return err return err
} }
// Send the query to all the storage nodes in parallel. // Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} {
sn.searchRequests.Inc() sn.searchRequests.Inc()
err := sn.processSearchQuery(qt, requestData, f, workerIdx, deadline) err := sn.processSearchQuery(qt, requestData, f, workerID, deadline)
if err != nil { if err != nil {
sn.searchErrors.Inc() sn.searchErrors.Inc()
err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err)
@ -1422,15 +1422,15 @@ type storageNodesRequest struct {
resultsCh chan interface{} resultsCh chan interface{}
} }
func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{}) *storageNodesRequest { func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest {
resultsCh := make(chan interface{}, len(storageNodes)) resultsCh := make(chan interface{}, len(storageNodes))
for idx, sn := range storageNodes { for idx, sn := range storageNodes {
qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr()) qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr())
go func(workerIdx int, sn *storageNode) { go func(workerID uint, sn *storageNode) {
result := f(qtChild, workerIdx, sn) result := f(qtChild, workerID, sn)
resultsCh <- result resultsCh <- result
qtChild.Done() qtChild.Done()
}(idx, sn) }(uint(idx), sn)
} }
return &storageNodesRequest{ return &storageNodesRequest{
denyPartialResponse: denyPartialResponse, denyPartialResponse: denyPartialResponse,
@ -1697,10 +1697,10 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD
return metricNames, nil return metricNames, nil
} }
func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerIdx int) error, func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerID uint) error,
workerIdx int, deadline searchutils.Deadline) error { workerID uint, deadline searchutils.Deadline) error {
f := func(bc *handshake.BufferedConn) error { f := func(bc *handshake.BufferedConn) error {
if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerIdx); err != nil { if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerID); err != nil {
return err return err
} }
return nil return nil
@ -2201,7 +2201,7 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn
const maxMetricNameSize = 64 * 1024 const maxMetricNameSize = 64 * 1024
func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte,
processBlock func(mb *storage.MetricBlock, workerIdx int) error, workerIdx int) error { processBlock func(mb *storage.MetricBlock, workerID uint) error, workerID uint) error {
// Send the request to sn. // Send the request to sn.
if err := writeBytes(bc, requestData); err != nil { if err := writeBytes(bc, requestData); err != nil {
return fmt.Errorf("cannot write requestData: %w", err) return fmt.Errorf("cannot write requestData: %w", err)
@ -2241,7 +2241,7 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ
blocksRead++ blocksRead++
sn.metricBlocksRead.Inc() sn.metricBlocksRead.Inc()
sn.metricRowsRead.Add(mb.Block.RowsCount()) sn.metricRowsRead.Add(mb.Block.RowsCount())
if err := processBlock(&mb, workerIdx); err != nil { if err := processBlock(&mb, workerID); err != nil {
return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err) return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err)
} }
} }
@ -2440,7 +2440,7 @@ func newPerNodeCounter() *perNodeCounter {
} }
} }
func (pnc *perNodeCounter) Add(nodeIdx int, n uint64) uint64 { func (pnc *perNodeCounter) Add(nodeIdx uint, n uint64) uint64 {
return atomic.AddUint64(&pnc.ns[nodeIdx].n, n) return atomic.AddUint64(&pnc.ns[nodeIdx].n, n)
} }

View file

@ -78,7 +78,7 @@ var tmpBlocksFilePool sync.Pool
type tmpBlockAddr struct { type tmpBlockAddr struct {
offset uint64 offset uint64
size int size int
tbfIdx int tbfIdx uint
} }
func (addr tmpBlockAddr) String() string { func (addr tmpBlockAddr) String() string {
@ -96,7 +96,7 @@ var (
// //
// It returns errors since the operation may fail on space shortage // It returns errors since the operation may fail on space shortage
// and this must be handled. // and this must be handled.
func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx int) (tmpBlockAddr, error) { func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx uint) (tmpBlockAddr, error) {
var addr tmpBlockAddr var addr tmpBlockAddr
addr.tbfIdx = tbfIdx addr.tbfIdx = tbfIdx
addr.offset = tbf.offset addr.offset = tbf.offset

View file

@ -126,49 +126,24 @@
} }
{% endfunc %} {% endfunc %}
{% func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %} {% func ExportPromAPIHeader() %}
{ {
{% code
lines := 0
bytesTotal := 0
%}
"status":"success", "status":"success",
"data":{ "data":{
"resultType":"matrix", "resultType":"matrix",
"result":[ "result":[
{% code bb, ok := <-resultsCh %} {% endfunc %}
{% if ok %}
{%z= bb.B %} {% func ExportPromAPIFooter(qt *querytracer.Tracer) %}
{% code
lines++
bytesTotal += len(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
%}
{% for bb := range resultsCh %}
,{%z= bb.B %}
{% code
lines++
bytesTotal += len(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
%}
{% endfor %}
{% endif %}
] ]
} }
{% code {% code
qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) qt.Donef("export format=promapi")
%} %}
{%= dumpQueryTrace(qt) %} {%= dumpQueryTrace(qt) %}
} }
{% endfunc %} {% endfunc %}
{% func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %}
{% for bb := range resultsCh %}
{%z= bb.B %}
{% code quicktemplate.ReleaseByteBuffer(bb) %}
{% endfor %}
{% endfunc %}
{% func prometheusMetricName(mn *storage.MetricName) %} {% func prometheusMetricName(mn *storage.MetricName) %}
{%z= mn.MetricGroup %} {%z= mn.MetricGroup %}
{% if len(mn.Tags) > 0 %} {% if len(mn.Tags) > 0 %}

View file

@ -415,184 +415,142 @@ func ExportPromAPILine(xb *exportBlock) string {
} }
//line app/vmselect/prometheus/export.qtpl:129 //line app/vmselect/prometheus/export.qtpl:129
func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { func StreamExportPromAPIHeader(qw422016 *qt422016.Writer) {
//line app/vmselect/prometheus/export.qtpl:129 //line app/vmselect/prometheus/export.qtpl:129
qw422016.N().S(`{`) qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`)
//line app/vmselect/prometheus/export.qtpl:132 //line app/vmselect/prometheus/export.qtpl:135
lines := 0 }
bytesTotal := 0
//line app/vmselect/prometheus/export.qtpl:134 //line app/vmselect/prometheus/export.qtpl:135
qw422016.N().S(`"status":"success","data":{"resultType":"matrix","result":[`) func WriteExportPromAPIHeader(qq422016 qtio422016.Writer) {
//line app/vmselect/prometheus/export.qtpl:139 //line app/vmselect/prometheus/export.qtpl:135
bb, ok := <-resultsCh qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:135
StreamExportPromAPIHeader(qw422016)
//line app/vmselect/prometheus/export.qtpl:135
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:135
}
//line app/vmselect/prometheus/export.qtpl:140 //line app/vmselect/prometheus/export.qtpl:135
if ok { func ExportPromAPIHeader() string {
//line app/vmselect/prometheus/export.qtpl:141 //line app/vmselect/prometheus/export.qtpl:135
qw422016.N().Z(bb.B) qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:143 //line app/vmselect/prometheus/export.qtpl:135
lines++ WriteExportPromAPIHeader(qb422016)
bytesTotal += len(bb.B) //line app/vmselect/prometheus/export.qtpl:135
quicktemplate.ReleaseByteBuffer(bb) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:135
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:135
return qs422016
//line app/vmselect/prometheus/export.qtpl:135
}
//line app/vmselect/prometheus/export.qtpl:147 //line app/vmselect/prometheus/export.qtpl:137
for bb := range resultsCh { func StreamExportPromAPIFooter(qw422016 *qt422016.Writer, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/export.qtpl:147 //line app/vmselect/prometheus/export.qtpl:137
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:148
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:150
lines++
bytesTotal += len(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:154
}
//line app/vmselect/prometheus/export.qtpl:155
}
//line app/vmselect/prometheus/export.qtpl:155
qw422016.N().S(`]}`) qw422016.N().S(`]}`)
//line app/vmselect/prometheus/export.qtpl:159 //line app/vmselect/prometheus/export.qtpl:141
qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) qt.Donef("export format=promapi")
//line app/vmselect/prometheus/export.qtpl:161 //line app/vmselect/prometheus/export.qtpl:143
streamdumpQueryTrace(qw422016, qt) streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/export.qtpl:161 //line app/vmselect/prometheus/export.qtpl:143
qw422016.N().S(`}`) qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
} }
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { func WriteExportPromAPIFooter(qq422016 qtio422016.Writer, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
StreamExportPromAPIResponse(qw422016, resultsCh, qt) StreamExportPromAPIFooter(qw422016, qt)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
} }
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string { func ExportPromAPIFooter(qt *querytracer.Tracer) string {
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
WriteExportPromAPIResponse(qb422016, resultsCh, qt) WriteExportPromAPIFooter(qb422016, qt)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:163 //line app/vmselect/prometheus/export.qtpl:145
} }
//line app/vmselect/prometheus/export.qtpl:165 //line app/vmselect/prometheus/export.qtpl:147
func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/export.qtpl:166
for bb := range resultsCh {
//line app/vmselect/prometheus/export.qtpl:167
qw422016.N().Z(bb.B)
//line app/vmselect/prometheus/export.qtpl:168
quicktemplate.ReleaseByteBuffer(bb)
//line app/vmselect/prometheus/export.qtpl:169
}
//line app/vmselect/prometheus/export.qtpl:170
}
//line app/vmselect/prometheus/export.qtpl:170
func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/export.qtpl:170
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:170
StreamExportStdResponse(qw422016, resultsCh, qt)
//line app/vmselect/prometheus/export.qtpl:170
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:170
}
//line app/vmselect/prometheus/export.qtpl:170
func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string {
//line app/vmselect/prometheus/export.qtpl:170
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:170
WriteExportStdResponse(qb422016, resultsCh, qt)
//line app/vmselect/prometheus/export.qtpl:170
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:170
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:170
return qs422016
//line app/vmselect/prometheus/export.qtpl:170
}
//line app/vmselect/prometheus/export.qtpl:172
func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) { func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:173 //line app/vmselect/prometheus/export.qtpl:148
qw422016.N().Z(mn.MetricGroup) qw422016.N().Z(mn.MetricGroup)
//line app/vmselect/prometheus/export.qtpl:174 //line app/vmselect/prometheus/export.qtpl:149
if len(mn.Tags) > 0 { if len(mn.Tags) > 0 {
//line app/vmselect/prometheus/export.qtpl:174 //line app/vmselect/prometheus/export.qtpl:149
qw422016.N().S(`{`) qw422016.N().S(`{`)
//line app/vmselect/prometheus/export.qtpl:176 //line app/vmselect/prometheus/export.qtpl:151
tags := mn.Tags tags := mn.Tags
//line app/vmselect/prometheus/export.qtpl:177 //line app/vmselect/prometheus/export.qtpl:152
qw422016.N().Z(tags[0].Key) qw422016.N().Z(tags[0].Key)
//line app/vmselect/prometheus/export.qtpl:177 //line app/vmselect/prometheus/export.qtpl:152
qw422016.N().S(`=`) qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:177 //line app/vmselect/prometheus/export.qtpl:152
qw422016.N().QZ(tags[0].Value) qw422016.N().QZ(tags[0].Value)
//line app/vmselect/prometheus/export.qtpl:178 //line app/vmselect/prometheus/export.qtpl:153
tags = tags[1:] tags = tags[1:]
//line app/vmselect/prometheus/export.qtpl:179 //line app/vmselect/prometheus/export.qtpl:154
for i := range tags { for i := range tags {
//line app/vmselect/prometheus/export.qtpl:180 //line app/vmselect/prometheus/export.qtpl:155
tag := &tags[i] tag := &tags[i]
//line app/vmselect/prometheus/export.qtpl:180 //line app/vmselect/prometheus/export.qtpl:155
qw422016.N().S(`,`) qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:181 //line app/vmselect/prometheus/export.qtpl:156
qw422016.N().Z(tag.Key) qw422016.N().Z(tag.Key)
//line app/vmselect/prometheus/export.qtpl:181 //line app/vmselect/prometheus/export.qtpl:156
qw422016.N().S(`=`) qw422016.N().S(`=`)
//line app/vmselect/prometheus/export.qtpl:181 //line app/vmselect/prometheus/export.qtpl:156
qw422016.N().QZ(tag.Value) qw422016.N().QZ(tag.Value)
//line app/vmselect/prometheus/export.qtpl:182 //line app/vmselect/prometheus/export.qtpl:157
} }
//line app/vmselect/prometheus/export.qtpl:182 //line app/vmselect/prometheus/export.qtpl:157
qw422016.N().S(`}`) qw422016.N().S(`}`)
//line app/vmselect/prometheus/export.qtpl:184 //line app/vmselect/prometheus/export.qtpl:159
} }
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
} }
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) { func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) {
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
qw422016 := qt422016.AcquireWriter(qq422016) qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
streamprometheusMetricName(qw422016, mn) streamprometheusMetricName(qw422016, mn)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
qt422016.ReleaseWriter(qw422016) qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
} }
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
func prometheusMetricName(mn *storage.MetricName) string { func prometheusMetricName(mn *storage.MetricName) string {
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
qb422016 := qt422016.AcquireByteBuffer() qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
writeprometheusMetricName(qb422016, mn) writeprometheusMetricName(qb422016, mn)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
qs422016 := string(qb422016.B) qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
qt422016.ReleaseByteBuffer(qb422016) qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
return qs422016 return qs422016
//line app/vmselect/prometheus/export.qtpl:185 //line app/vmselect/prometheus/export.qtpl:160
} }

View file

@ -6,9 +6,11 @@ import (
"math" "math"
"net" "net"
"net/http" "net/http"
"runtime"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
@ -18,7 +20,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@ -28,7 +29,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
"github.com/valyala/fastjson/fastfloat" "github.com/valyala/fastjson/fastfloat"
"github.com/valyala/quicktemplate"
) )
var ( var (
@ -92,37 +92,19 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.Header().Set("Content-Type", "text/plain; charset=utf-8")
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
var m sync.Map sw := newScalableWriter(bw)
err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error { err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error {
if err := bw.Error(); err != nil { if err := bw.Error(); err != nil {
return err return err
} }
v, ok := m.Load(workerID) bb := sw.getBuffer(workerID)
if !ok {
v = &bytesutil.ByteBuffer{}
m.Store(workerID, v)
}
bb := v.(*bytesutil.ByteBuffer)
WriteFederate(bb, rs) WriteFederate(bb, rs)
if len(bb.B) < 1024*1024 { return sw.maybeFlushBuffer(bb)
return nil
}
_, err := bw.Write(bb.B)
bb.Reset()
return err
}) })
if err != nil { if err != nil {
return fmt.Errorf("error during sending data to remote client: %w", err) return fmt.Errorf("error during sending data to remote client: %w", err)
} }
m.Range(func(k, v interface{}) bool { return sw.flush()
bb := v.(*bytesutil.ByteBuffer)
_, err := bw.Write(bb.B)
return err == nil
})
if err := bw.Flush(); err != nil {
return err
}
return nil
} }
var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`)
@ -147,15 +129,14 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
w.Header().Set("Content-Type", "text/csv; charset=utf-8") w.Header().Set("Content-Type", "text/csv; charset=utf-8")
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
sw := newScalableWriter(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) writeCSVLine := func(xb *exportBlock, workerID uint) error {
writeCSVLine := func(xb *exportBlock) {
if len(xb.timestamps) == 0 { if len(xb.timestamps) == 0 {
return return nil
} }
bb := quicktemplate.AcquireByteBuffer() bb := sw.getBuffer(workerID)
WriteExportCSVLine(bb, xb, fieldNames) WriteExportCSVLine(bb, xb, fieldNames)
resultsCh <- bb return sw.maybeFlushBuffer(bb)
} }
doneCh := make(chan error, 1) doneCh := make(chan error, 1)
if !reduceMemUsage { if !reduceMemUsage {
@ -175,17 +156,18 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
xb.mn = &rs.MetricName xb.mn = &rs.MetricName
xb.timestamps = rs.Timestamps xb.timestamps = rs.Timestamps
xb.values = rs.Values xb.values = rs.Values
writeCSVLine(xb) if err := writeCSVLine(xb, workerID); err != nil {
return err
}
xb.reset() xb.reset()
exportBlockPool.Put(xb) exportBlockPool.Put(xb)
return nil return nil
}) })
close(resultsCh)
doneCh <- err doneCh <- err
}() }()
} else { } else {
go func() { go func() {
err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
if err := bw.Error(); err != nil { if err := bw.Error(); err != nil {
return err return err
} }
@ -195,29 +177,21 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter
xb := exportBlockPool.Get().(*exportBlock) xb := exportBlockPool.Get().(*exportBlock)
xb.mn = mn xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
writeCSVLine(xb) if err := writeCSVLine(xb, workerID); err != nil {
return err
}
xb.reset() xb.reset()
exportBlockPool.Put(xb) exportBlockPool.Put(xb)
return nil return nil
}) })
close(resultsCh)
doneCh <- err doneCh <- err
}() }()
} }
// Consume all the data from resultsCh.
for bb := range resultsCh {
// Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above.
_, _ = bw.Write(bb.B)
quicktemplate.ReleaseByteBuffer(bb)
}
if err := bw.Flush(); err != nil {
return err
}
err = <-doneCh err = <-doneCh
if err != nil { if err != nil {
return fmt.Errorf("error during sending the exported csv data to remote client: %w", err) return fmt.Errorf("error during sending the exported csv data to remote client: %w", err)
} }
return nil return sw.flush()
} }
var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`) var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`)
@ -235,6 +209,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
w.Header().Set("Content-Type", "VictoriaMetrics/native") w.Header().Set("Content-Type", "VictoriaMetrics/native")
bw := bufferedwriter.Get(w) bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw) defer bufferedwriter.Put(bw)
sw := newScalableWriter(bw)
// Marshal tr // Marshal tr
trBuf := make([]byte, 0, 16) trBuf := make([]byte, 0, 16)
@ -243,13 +218,13 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
_, _ = bw.Write(trBuf) _, _ = bw.Write(trBuf)
// Marshal native blocks. // Marshal native blocks.
err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
if err := bw.Error(); err != nil { if err := bw.Error(); err != nil {
return err return err
} }
dstBuf := bbPool.Get() bb := sw.getBuffer(workerID)
dst := bb.B
tmpBuf := bbPool.Get() tmpBuf := bbPool.Get()
dst := dstBuf.B
tmp := tmpBuf.B tmp := tmpBuf.B
// Marshal mn // Marshal mn
@ -265,19 +240,13 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri
tmpBuf.B = tmp tmpBuf.B = tmp
bbPool.Put(tmpBuf) bbPool.Put(tmpBuf)
_, err := bw.Write(dst) bb.B = dst
return sw.maybeFlushBuffer(bb)
dstBuf.B = dst
bbPool.Put(dstBuf)
return err
}) })
if err != nil { if err != nil {
return fmt.Errorf("error during sending native data to remote client: %w", err) return fmt.Errorf("error during sending native data to remote client: %w", err)
} }
if err := bw.Flush(); err != nil { return sw.flush()
return fmt.Errorf("error during flushing native data to remote client: %w", err)
}
return nil
} }
var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`) var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`)
@ -304,31 +273,48 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r
var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`)
func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error { func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error {
writeResponseFunc := WriteExportStdResponse bw := bufferedwriter.Get(w)
writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { defer bufferedwriter.Put(bw)
bb := quicktemplate.AcquireByteBuffer() sw := newScalableWriter(bw)
writeLineFunc := func(xb *exportBlock, workerID uint) error {
bb := sw.getBuffer(workerID)
WriteExportJSONLine(bb, xb) WriteExportJSONLine(bb, xb)
resultsCh <- bb return sw.maybeFlushBuffer(bb)
} }
contentType := "application/stream+json; charset=utf-8" contentType := "application/stream+json; charset=utf-8"
if format == "prometheus" { if format == "prometheus" {
contentType = "text/plain; charset=utf-8" contentType = "text/plain; charset=utf-8"
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { writeLineFunc = func(xb *exportBlock, workerID uint) error {
bb := quicktemplate.AcquireByteBuffer() bb := sw.getBuffer(workerID)
WriteExportPrometheusLine(bb, xb) WriteExportPrometheusLine(bb, xb)
resultsCh <- bb return sw.maybeFlushBuffer(bb)
} }
} else if format == "promapi" { } else if format == "promapi" {
writeResponseFunc = WriteExportPromAPIResponse WriteExportPromAPIHeader(bw)
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { firstLineOnce := uint32(0)
bb := quicktemplate.AcquireByteBuffer() firstLineSent := uint32(0)
writeLineFunc = func(xb *exportBlock, workerID uint) error {
bb := sw.getBuffer(workerID)
if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) {
// Send the first line to sw.bw
WriteExportPromAPILine(bb, xb)
_, err := sw.bw.Write(bb.B)
bb.Reset()
atomic.StoreUint32(&firstLineSent, 1)
return err
}
for atomic.LoadUint32(&firstLineSent) == 0 {
// Busy wait until the first line is sent to sw.bw
runtime.Gosched()
}
bb.B = append(bb.B, ',')
WriteExportPromAPILine(bb, xb) WriteExportPromAPILine(bb, xb)
resultsCh <- bb return sw.maybeFlushBuffer(bb)
} }
} }
if maxRowsPerLine > 0 { if maxRowsPerLine > 0 {
writeLineFuncOrig := writeLineFunc writeLineFuncOrig := writeLineFunc
writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { writeLineFunc = func(xb *exportBlock, workerID uint) error {
valuesOrig := xb.values valuesOrig := xb.values
timestampsOrig := xb.timestamps timestampsOrig := xb.timestamps
values := valuesOrig values := valuesOrig
@ -349,19 +335,19 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
} }
xb.values = valuesChunk xb.values = valuesChunk
xb.timestamps = timestampsChunk xb.timestamps = timestampsChunk
writeLineFuncOrig(xb, resultsCh) if err := writeLineFuncOrig(xb, workerID); err != nil {
return err
}
} }
xb.values = valuesOrig xb.values = valuesOrig
xb.timestamps = timestampsOrig xb.timestamps = timestampsOrig
return nil
} }
} }
sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries)
w.Header().Set("Content-Type", contentType) w.Header().Set("Content-Type", contentType)
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs())
doneCh := make(chan error, 1) doneCh := make(chan error, 1)
if !reduceMemUsage { if !reduceMemUsage {
// Unconditionally deny partial response for the exported data, // Unconditionally deny partial response for the exported data,
@ -381,19 +367,20 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
xb.mn = &rs.MetricName xb.mn = &rs.MetricName
xb.timestamps = rs.Timestamps xb.timestamps = rs.Timestamps
xb.values = rs.Values xb.values = rs.Values
writeLineFunc(xb, resultsCh) if err := writeLineFunc(xb, workerID); err != nil {
return err
}
xb.reset() xb.reset()
exportBlockPool.Put(xb) exportBlockPool.Put(xb)
return nil return nil
}) })
qtChild.Done() qtChild.Done()
close(resultsCh)
doneCh <- err doneCh <- err
}() }()
} else { } else {
qtChild := qt.NewChild("background export format=%s", format) qtChild := qt.NewChild("background export format=%s", format)
go func() { go func() {
err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error {
if err := bw.Error(); err != nil { if err := bw.Error(); err != nil {
return err return err
} }
@ -404,26 +391,30 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter
xb.mn = mn xb.mn = mn
xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr)
if len(xb.timestamps) > 0 { if len(xb.timestamps) > 0 {
writeLineFunc(xb, resultsCh) if err := writeLineFunc(xb, workerID); err != nil {
return err
}
} }
xb.reset() xb.reset()
exportBlockPool.Put(xb) exportBlockPool.Put(xb)
return nil return nil
}) })
qtChild.Done() qtChild.Done()
close(resultsCh)
doneCh <- err doneCh <- err
}() }()
} }
// writeResponseFunc must consume all the data from resultsCh.
writeResponseFunc(bw, resultsCh, qt)
if err := bw.Flush(); err != nil {
return err
}
err := <-doneCh err := <-doneCh
if err != nil { if err != nil {
return fmt.Errorf("error during sending the data to remote client: %w", err) return fmt.Errorf("cannot send data to remote client: %w", err)
}
if err := sw.flush(); err != nil {
return fmt.Errorf("cannot send data to remote client: %w", err)
}
if format == "promapi" {
WriteExportPromAPIFooter(bw, qt)
}
if err := bw.Flush(); err != nil {
return err
} }
return nil return nil
} }
@ -1212,3 +1203,41 @@ func getCommonParams(r *http.Request, startTime time.Time, requireNonEmptyMatch
} }
return cp, nil return cp, nil
} }
type scalableWriter struct {
bw *bufferedwriter.Writer
m sync.Map
}
func newScalableWriter(bw *bufferedwriter.Writer) *scalableWriter {
return &scalableWriter{
bw: bw,
}
}
func (sw *scalableWriter) getBuffer(workerID uint) *bytesutil.ByteBuffer {
v, ok := sw.m.Load(workerID)
if !ok {
v = &bytesutil.ByteBuffer{}
sw.m.Store(workerID, v)
}
return v.(*bytesutil.ByteBuffer)
}
func (sw *scalableWriter) maybeFlushBuffer(bb *bytesutil.ByteBuffer) error {
if len(bb.B) < 1024*1024 {
return nil
}
_, err := sw.bw.Write(bb.B)
bb.Reset()
return err
}
func (sw *scalableWriter) flush() error {
sw.m.Range(func(k, v interface{}) bool {
bb := v.(*bytesutil.ByteBuffer)
_, err := sw.bw.Write(bb.B)
return err == nil
})
return sw.bw.Flush()
}

View file

@ -27,6 +27,7 @@ See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#m
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970). * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970).
* FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x for non-trivial `regex` values such as `([^:]+):.+`, which can be used for extracting a `host` part from `host:port` label value. * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x for non-trivial `regex` values such as `([^:]+):.+`, which can be used for extracting a `host` part from `host:port` label value.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): improve performance by up to 4x for queries containing non-trivial `regex` filters such as `{path=~"/foo/.+|/bar"}`. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): improve performance by up to 4x for queries containing non-trivial `regex` filters such as `{path=~"/foo/.+|/bar"}`.
* FEATURE: improve performance scalability on systems with many CPU cores for [/federate](https://docs.victoriametrics.com/#federation) and [/api/v1/export/...](https://docs.victoriametrics.com/#how-to-export-time-series) endpoints.
* FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105).
* FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113).
* FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102). * FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).