From 43bdd96a6ef12a4a541628af376086223f4ee954 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sat, 1 Oct 2022 22:05:43 +0300 Subject: [PATCH] app/vmselect: improve performance scalability on multi-CPU systems for `/api/v1/export/...` endpoints --- app/vmselect/clusternative/vmselect.go | 2 +- app/vmselect/netstorage/netstorage.go | 78 ++++---- app/vmselect/netstorage/tmp_blocks_file.go | 4 +- app/vmselect/prometheus/export.qtpl | 35 +--- app/vmselect/prometheus/export.qtpl.go | 208 ++++++++------------- app/vmselect/prometheus/prometheus.go | 199 +++++++++++--------- docs/CHANGELOG.md | 1 + 7 files changed, 245 insertions(+), 282 deletions(-) diff --git a/app/vmselect/clusternative/vmselect.go b/app/vmselect/clusternative/vmselect.go index 2748ea3e2..c03652217 100644 --- a/app/vmselect/clusternative/vmselect.go +++ b/app/vmselect/clusternative/vmselect.go @@ -112,7 +112,7 @@ func newBlockIterator(qt *querytracer.Tracer, denyPartialResponse bool, sq *stor bi.workCh = make(chan workItem, 16) bi.wg.Add(1) go func() { - _, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerIdx int) error { + _, err := netstorage.ProcessBlocks(qt, denyPartialResponse, sq, func(mb *storage.MetricBlock, workerID uint) error { wi := workItem{ mb: mb, doneCh: make(chan struct{}), diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 4a27f7021..978ccb15e 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -662,9 +662,9 @@ func RegisterMetricNames(qt *querytracer.Tracer, mrs []storage.MetricRow, deadli } // Push mrs to storage nodes in parallel. - snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.registerMetricNamesRequests.Inc() - err := sn.registerMetricNames(qt, mrsPerNode[workerIdx], deadline) + err := sn.registerMetricNames(qt, mrsPerNode[workerID], deadline) if err != nil { sn.registerMetricNamesErrors.Inc() } @@ -693,7 +693,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear deletedCount int err error } - snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, true, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.deleteSeriesRequests.Inc() deletedCount, err := sn.deleteSeries(qt, requestData, deadline) if err != nil { @@ -734,7 +734,7 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se labelNames []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.labelNamesRequests.Inc() labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) if err != nil { @@ -836,7 +836,7 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str labelValues []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.labelValuesRequests.Inc() labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) if err != nil { @@ -918,7 +918,7 @@ func TagValueSuffixes(qt *querytracer.Tracer, accountID, projectID uint32, denyP suffixes []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.tagValueSuffixesRequests.Inc() suffixes, err := sn.getTagValueSuffixes(qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, deadline) if err != nil { @@ -982,7 +982,7 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se status *storage.TSDBStatus err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.tsdbStatusRequests.Inc() status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) if err != nil { @@ -1087,7 +1087,7 @@ func SeriesCount(qt *querytracer.Tracer, accountID, projectID uint32, denyPartia n uint64 err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.seriesCountRequests.Inc() n, err := sn.getSeriesCount(qt, accountID, projectID, deadline) if err != nil { @@ -1139,16 +1139,16 @@ func newTmpBlocksFileWrapper() *tmpBlocksFileWrapper { } } -func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerIdx int) error { +func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, workerID uint) error { bb := tmpBufPool.Get() bb.B = storage.MarshalBlock(bb.B[:0], &mb.Block) - addr, err := tbfw.tbfs[workerIdx].WriteBlockData(bb.B, workerIdx) + addr, err := tbfw.tbfs[workerID].WriteBlockData(bb.B, workerID) tmpBufPool.Put(bb) if err != nil { return err } metricName := mb.MetricName - m := tbfw.ms[workerIdx] + m := tbfw.ms[workerID] addrs := m[string(metricName)] addrs = append(addrs, addr) if len(addrs) > 1 { @@ -1156,11 +1156,11 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, } else { // An optimization for big number of time series with long names: store only a single copy of metricNameStr // in both tbfw.orderedMetricNamess and tbfw.ms. - orderedMetricNames := tbfw.orderedMetricNamess[workerIdx] + orderedMetricNames := tbfw.orderedMetricNamess[workerID] orderedMetricNames = append(orderedMetricNames, string(metricName)) metricNameStr := orderedMetricNames[len(orderedMetricNames)-1] m[metricNameStr] = addrs - tbfw.orderedMetricNamess[workerIdx] = orderedMetricNames + tbfw.orderedMetricNamess[workerID] = orderedMetricNames } return nil } @@ -1200,7 +1200,7 @@ var metricNamePool = &sync.Pool{ // It is the responsibility of f to call b.UnmarshalData before reading timestamps and values from the block. // It is the responsibility of f to filter blocks according to the given tr. func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutils.Deadline, - f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error) error { + f func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error) error { qt = qt.NewChild("export blocks: %s", sq) defer qt.Done() if deadline.Exceeded() { @@ -1212,18 +1212,18 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear } blocksRead := newPerNodeCounter() samples := newPerNodeCounter() - processBlock := func(mb *storage.MetricBlock, workerIdx int) error { + processBlock := func(mb *storage.MetricBlock, workerID uint) error { mn := metricNamePool.Get().(*storage.MetricName) if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) } - if err := f(mn, &mb.Block, tr); err != nil { + if err := f(mn, &mb.Block, tr, workerID); err != nil { return err } mn.Reset() metricNamePool.Put(mn) - blocksRead.Add(workerIdx, 1) - samples.Add(workerIdx, uint64(mb.Block.RowsCount())) + blocksRead.Add(workerID, 1) + samples.Add(workerID, uint64(mb.Block.RowsCount())) return nil } _, err := ProcessBlocks(qt, true, sq, processBlock, deadline) @@ -1250,7 +1250,7 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto metricNames []string err error } - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.searchMetricNamesRequests.Inc() metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) if err != nil { @@ -1307,15 +1307,15 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st blocksRead := newPerNodeCounter() samples := newPerNodeCounter() maxSamplesPerWorker := uint64(*maxSamplesPerQuery) / uint64(len(storageNodes)) - processBlock := func(mb *storage.MetricBlock, workerIdx int) error { - blocksRead.Add(workerIdx, 1) - n := samples.Add(workerIdx, uint64(mb.Block.RowsCount())) + processBlock := func(mb *storage.MetricBlock, workerID uint) error { + blocksRead.Add(workerID, 1) + n := samples.Add(workerID, uint64(mb.Block.RowsCount())) if *maxSamplesPerQuery > 0 && n > maxSamplesPerWorker && samples.GetTotal() > uint64(*maxSamplesPerQuery) { return fmt.Errorf("cannot select more than -search.maxSamplesPerQuery=%d samples; possible solutions: "+ "to increase the -search.maxSamplesPerQuery; to reduce time range for the query; "+ "to use more specific label filters in order to select lower number of series", *maxSamplesPerQuery) } - if err := tbfw.RegisterAndWriteBlock(mb, workerIdx); err != nil { + if err := tbfw.RegisterAndWriteBlock(mb, workerID); err != nil { return fmt.Errorf("cannot write MetricBlock to temporary blocks file: %w", err) } return nil @@ -1348,7 +1348,7 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st // ProcessBlocks calls processBlock per each block matching the given sq. func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, - processBlock func(mb *storage.MetricBlock, workerIdx int) error, deadline searchutils.Deadline) (bool, error) { + processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutils.Deadline) (bool, error) { requestData := sq.Marshal(nil) // Make sure that processBlock is no longer called after the exit from ProcessBlocks() function. @@ -1371,8 +1371,8 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage _ [128 - unsafe.Sizeof(wgStruct{})%128]byte } wgs := make([]wgWithPadding, len(storageNodes)) - f := func(mb *storage.MetricBlock, workerIdx int) error { - muwg := &wgs[workerIdx] + f := func(mb *storage.MetricBlock, workerID uint) error { + muwg := &wgs[workerID] muwg.mu.Lock() if muwg.stop { muwg.mu.Unlock() @@ -1380,15 +1380,15 @@ func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage } muwg.wg.Add(1) muwg.mu.Unlock() - err := processBlock(mb, workerIdx) + err := processBlock(mb, workerID) muwg.wg.Done() return err } // Send the query to all the storage nodes in parallel. - snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{} { + snr := startStorageNodesRequest(qt, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{} { sn.searchRequests.Inc() - err := sn.processSearchQuery(qt, requestData, f, workerIdx, deadline) + err := sn.processSearchQuery(qt, requestData, f, workerID, deadline) if err != nil { sn.searchErrors.Inc() err = fmt.Errorf("cannot perform search on vmstorage %s: %w", sn.connPool.Addr(), err) @@ -1422,15 +1422,15 @@ type storageNodesRequest struct { resultsCh chan interface{} } -func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerIdx int, sn *storageNode) interface{}) *storageNodesRequest { +func startStorageNodesRequest(qt *querytracer.Tracer, denyPartialResponse bool, f func(qt *querytracer.Tracer, workerID uint, sn *storageNode) interface{}) *storageNodesRequest { resultsCh := make(chan interface{}, len(storageNodes)) for idx, sn := range storageNodes { qtChild := qt.NewChild("rpc at vmstorage %s", sn.connPool.Addr()) - go func(workerIdx int, sn *storageNode) { - result := f(qtChild, workerIdx, sn) + go func(workerID uint, sn *storageNode) { + result := f(qtChild, workerID, sn) resultsCh <- result qtChild.Done() - }(idx, sn) + }(uint(idx), sn) } return &storageNodesRequest{ denyPartialResponse: denyPartialResponse, @@ -1697,10 +1697,10 @@ func (sn *storageNode) processSearchMetricNames(qt *querytracer.Tracer, requestD return metricNames, nil } -func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerIdx int) error, - workerIdx int, deadline searchutils.Deadline) error { +func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []byte, processBlock func(mb *storage.MetricBlock, workerID uint) error, + workerID uint, deadline searchutils.Deadline) error { f := func(bc *handshake.BufferedConn) error { - if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerIdx); err != nil { + if err := sn.processSearchQueryOnConn(bc, requestData, processBlock, workerID); err != nil { return err } return nil @@ -2201,7 +2201,7 @@ func (sn *storageNode) processSearchMetricNamesOnConn(bc *handshake.BufferedConn const maxMetricNameSize = 64 * 1024 func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requestData []byte, - processBlock func(mb *storage.MetricBlock, workerIdx int) error, workerIdx int) error { + processBlock func(mb *storage.MetricBlock, workerID uint) error, workerID uint) error { // Send the request to sn. if err := writeBytes(bc, requestData); err != nil { return fmt.Errorf("cannot write requestData: %w", err) @@ -2241,7 +2241,7 @@ func (sn *storageNode) processSearchQueryOnConn(bc *handshake.BufferedConn, requ blocksRead++ sn.metricBlocksRead.Inc() sn.metricRowsRead.Add(mb.Block.RowsCount()) - if err := processBlock(&mb, workerIdx); err != nil { + if err := processBlock(&mb, workerID); err != nil { return fmt.Errorf("cannot process MetricBlock #%d: %w", blocksRead, err) } } @@ -2440,7 +2440,7 @@ func newPerNodeCounter() *perNodeCounter { } } -func (pnc *perNodeCounter) Add(nodeIdx int, n uint64) uint64 { +func (pnc *perNodeCounter) Add(nodeIdx uint, n uint64) uint64 { return atomic.AddUint64(&pnc.ns[nodeIdx].n, n) } diff --git a/app/vmselect/netstorage/tmp_blocks_file.go b/app/vmselect/netstorage/tmp_blocks_file.go index 83b09c7b8..9cbc1077f 100644 --- a/app/vmselect/netstorage/tmp_blocks_file.go +++ b/app/vmselect/netstorage/tmp_blocks_file.go @@ -78,7 +78,7 @@ var tmpBlocksFilePool sync.Pool type tmpBlockAddr struct { offset uint64 size int - tbfIdx int + tbfIdx uint } func (addr tmpBlockAddr) String() string { @@ -96,7 +96,7 @@ var ( // // It returns errors since the operation may fail on space shortage // and this must be handled. -func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx int) (tmpBlockAddr, error) { +func (tbf *tmpBlocksFile) WriteBlockData(b []byte, tbfIdx uint) (tmpBlockAddr, error) { var addr tmpBlockAddr addr.tbfIdx = tbfIdx addr.offset = tbf.offset diff --git a/app/vmselect/prometheus/export.qtpl b/app/vmselect/prometheus/export.qtpl index 90bbbd5f5..06f675e54 100644 --- a/app/vmselect/prometheus/export.qtpl +++ b/app/vmselect/prometheus/export.qtpl @@ -126,49 +126,24 @@ } {% endfunc %} -{% func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %} +{% func ExportPromAPIHeader() %} { - {% code - lines := 0 - bytesTotal := 0 - %} "status":"success", "data":{ "resultType":"matrix", "result":[ - {% code bb, ok := <-resultsCh %} - {% if ok %} - {%z= bb.B %} - {% code - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - %} - {% for bb := range resultsCh %} - ,{%z= bb.B %} - {% code - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - %} - {% endfor %} - {% endif %} +{% endfunc %} + +{% func ExportPromAPIFooter(qt *querytracer.Tracer) %} ] } {% code - qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) + qt.Donef("export format=promapi") %} {%= dumpQueryTrace(qt) %} } {% endfunc %} -{% func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) %} - {% for bb := range resultsCh %} - {%z= bb.B %} - {% code quicktemplate.ReleaseByteBuffer(bb) %} - {% endfor %} -{% endfunc %} - {% func prometheusMetricName(mn *storage.MetricName) %} {%z= mn.MetricGroup %} {% if len(mn.Tags) > 0 %} diff --git a/app/vmselect/prometheus/export.qtpl.go b/app/vmselect/prometheus/export.qtpl.go index f3913dbaa..7b20c7fa0 100644 --- a/app/vmselect/prometheus/export.qtpl.go +++ b/app/vmselect/prometheus/export.qtpl.go @@ -415,184 +415,142 @@ func ExportPromAPILine(xb *exportBlock) string { } //line app/vmselect/prometheus/export.qtpl:129 -func StreamExportPromAPIResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { +func StreamExportPromAPIHeader(qw422016 *qt422016.Writer) { //line app/vmselect/prometheus/export.qtpl:129 - qw422016.N().S(`{`) -//line app/vmselect/prometheus/export.qtpl:132 - lines := 0 - bytesTotal := 0 + qw422016.N().S(`{"status":"success","data":{"resultType":"matrix","result":[`) +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:134 - qw422016.N().S(`"status":"success","data":{"resultType":"matrix","result":[`) -//line app/vmselect/prometheus/export.qtpl:139 - bb, ok := <-resultsCh +//line app/vmselect/prometheus/export.qtpl:135 +func WriteExportPromAPIHeader(qq422016 qtio422016.Writer) { +//line app/vmselect/prometheus/export.qtpl:135 + qw422016 := qt422016.AcquireWriter(qq422016) +//line app/vmselect/prometheus/export.qtpl:135 + StreamExportPromAPIHeader(qw422016) +//line app/vmselect/prometheus/export.qtpl:135 + qt422016.ReleaseWriter(qw422016) +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:140 - if ok { -//line app/vmselect/prometheus/export.qtpl:141 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:143 - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) +//line app/vmselect/prometheus/export.qtpl:135 +func ExportPromAPIHeader() string { +//line app/vmselect/prometheus/export.qtpl:135 + qb422016 := qt422016.AcquireByteBuffer() +//line app/vmselect/prometheus/export.qtpl:135 + WriteExportPromAPIHeader(qb422016) +//line app/vmselect/prometheus/export.qtpl:135 + qs422016 := string(qb422016.B) +//line app/vmselect/prometheus/export.qtpl:135 + qt422016.ReleaseByteBuffer(qb422016) +//line app/vmselect/prometheus/export.qtpl:135 + return qs422016 +//line app/vmselect/prometheus/export.qtpl:135 +} -//line app/vmselect/prometheus/export.qtpl:147 - for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:147 - qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:148 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:150 - lines++ - bytesTotal += len(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - -//line app/vmselect/prometheus/export.qtpl:154 - } -//line app/vmselect/prometheus/export.qtpl:155 - } -//line app/vmselect/prometheus/export.qtpl:155 +//line app/vmselect/prometheus/export.qtpl:137 +func StreamExportPromAPIFooter(qw422016 *qt422016.Writer, qt *querytracer.Tracer) { +//line app/vmselect/prometheus/export.qtpl:137 qw422016.N().S(`]}`) -//line app/vmselect/prometheus/export.qtpl:159 - qt.Donef("export format=promapi: lines=%d, bytes=%d", lines, bytesTotal) +//line app/vmselect/prometheus/export.qtpl:141 + qt.Donef("export format=promapi") -//line app/vmselect/prometheus/export.qtpl:161 +//line app/vmselect/prometheus/export.qtpl:143 streamdumpQueryTrace(qw422016, qt) -//line app/vmselect/prometheus/export.qtpl:161 +//line app/vmselect/prometheus/export.qtpl:143 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:163 -func WriteExportPromAPIResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 +func WriteExportPromAPIFooter(qq422016 qtio422016.Writer, qt *querytracer.Tracer) { +//line app/vmselect/prometheus/export.qtpl:145 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:163 - StreamExportPromAPIResponse(qw422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 + StreamExportPromAPIFooter(qw422016, qt) +//line app/vmselect/prometheus/export.qtpl:145 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:163 -func ExportPromAPIResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string { -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 +func ExportPromAPIFooter(qt *querytracer.Tracer) string { +//line app/vmselect/prometheus/export.qtpl:145 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:163 - WriteExportPromAPIResponse(qb422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 + WriteExportPromAPIFooter(qb422016, qt) +//line app/vmselect/prometheus/export.qtpl:145 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 return qs422016 -//line app/vmselect/prometheus/export.qtpl:163 +//line app/vmselect/prometheus/export.qtpl:145 } -//line app/vmselect/prometheus/export.qtpl:165 -func StreamExportStdResponse(qw422016 *qt422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:166 - for bb := range resultsCh { -//line app/vmselect/prometheus/export.qtpl:167 - qw422016.N().Z(bb.B) -//line app/vmselect/prometheus/export.qtpl:168 - quicktemplate.ReleaseByteBuffer(bb) - -//line app/vmselect/prometheus/export.qtpl:169 - } -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:170 -func WriteExportStdResponse(qq422016 qtio422016.Writer, resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) { -//line app/vmselect/prometheus/export.qtpl:170 - qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:170 - StreamExportStdResponse(qw422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:170 - qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:170 -func ExportStdResponse(resultsCh <-chan *quicktemplate.ByteBuffer, qt *querytracer.Tracer) string { -//line app/vmselect/prometheus/export.qtpl:170 - qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:170 - WriteExportStdResponse(qb422016, resultsCh, qt) -//line app/vmselect/prometheus/export.qtpl:170 - qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:170 - qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:170 - return qs422016 -//line app/vmselect/prometheus/export.qtpl:170 -} - -//line app/vmselect/prometheus/export.qtpl:172 +//line app/vmselect/prometheus/export.qtpl:147 func streamprometheusMetricName(qw422016 *qt422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:173 +//line app/vmselect/prometheus/export.qtpl:148 qw422016.N().Z(mn.MetricGroup) -//line app/vmselect/prometheus/export.qtpl:174 +//line app/vmselect/prometheus/export.qtpl:149 if len(mn.Tags) > 0 { -//line app/vmselect/prometheus/export.qtpl:174 +//line app/vmselect/prometheus/export.qtpl:149 qw422016.N().S(`{`) -//line app/vmselect/prometheus/export.qtpl:176 +//line app/vmselect/prometheus/export.qtpl:151 tags := mn.Tags -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().Z(tags[0].Key) -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:177 +//line app/vmselect/prometheus/export.qtpl:152 qw422016.N().QZ(tags[0].Value) -//line app/vmselect/prometheus/export.qtpl:178 +//line app/vmselect/prometheus/export.qtpl:153 tags = tags[1:] -//line app/vmselect/prometheus/export.qtpl:179 +//line app/vmselect/prometheus/export.qtpl:154 for i := range tags { -//line app/vmselect/prometheus/export.qtpl:180 +//line app/vmselect/prometheus/export.qtpl:155 tag := &tags[i] -//line app/vmselect/prometheus/export.qtpl:180 +//line app/vmselect/prometheus/export.qtpl:155 qw422016.N().S(`,`) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().Z(tag.Key) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().S(`=`) -//line app/vmselect/prometheus/export.qtpl:181 +//line app/vmselect/prometheus/export.qtpl:156 qw422016.N().QZ(tag.Value) -//line app/vmselect/prometheus/export.qtpl:182 +//line app/vmselect/prometheus/export.qtpl:157 } -//line app/vmselect/prometheus/export.qtpl:182 +//line app/vmselect/prometheus/export.qtpl:157 qw422016.N().S(`}`) -//line app/vmselect/prometheus/export.qtpl:184 +//line app/vmselect/prometheus/export.qtpl:159 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 func writeprometheusMetricName(qq422016 qtio422016.Writer, mn *storage.MetricName) { -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qw422016 := qt422016.AcquireWriter(qq422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 streamprometheusMetricName(qw422016, mn) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qt422016.ReleaseWriter(qw422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 func prometheusMetricName(mn *storage.MetricName) string { -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qb422016 := qt422016.AcquireByteBuffer() -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 writeprometheusMetricName(qb422016, mn) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qs422016 := string(qb422016.B) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 qt422016.ReleaseByteBuffer(qb422016) -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 return qs422016 -//line app/vmselect/prometheus/export.qtpl:185 +//line app/vmselect/prometheus/export.qtpl:160 } diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index 5eec38302..515b619b5 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -6,9 +6,11 @@ import ( "math" "net" "net/http" + "runtime" "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" @@ -18,7 +20,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" "github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -28,7 +29,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/metrics" "github.com/valyala/fastjson/fastfloat" - "github.com/valyala/quicktemplate" ) var ( @@ -92,37 +92,19 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, w.Header().Set("Content-Type", "text/plain; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - var m sync.Map + sw := newScalableWriter(bw) err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error { if err := bw.Error(); err != nil { return err } - v, ok := m.Load(workerID) - if !ok { - v = &bytesutil.ByteBuffer{} - m.Store(workerID, v) - } - bb := v.(*bytesutil.ByteBuffer) + bb := sw.getBuffer(workerID) WriteFederate(bb, rs) - if len(bb.B) < 1024*1024 { - return nil - } - _, err := bw.Write(bb.B) - bb.Reset() - return err + return sw.maybeFlushBuffer(bb) }) if err != nil { return fmt.Errorf("error during sending data to remote client: %w", err) } - m.Range(func(k, v interface{}) bool { - bb := v.(*bytesutil.ByteBuffer) - _, err := bw.Write(bb.B) - return err == nil - }) - if err := bw.Flush(); err != nil { - return err - } - return nil + return sw.flush() } var federateDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/federate"}`) @@ -147,15 +129,14 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter w.Header().Set("Content-Type", "text/csv; charset=utf-8") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) - - resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) - writeCSVLine := func(xb *exportBlock) { + sw := newScalableWriter(bw) + writeCSVLine := func(xb *exportBlock, workerID uint) error { if len(xb.timestamps) == 0 { - return + return nil } - bb := quicktemplate.AcquireByteBuffer() + bb := sw.getBuffer(workerID) WriteExportCSVLine(bb, xb, fieldNames) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } doneCh := make(chan error, 1) if !reduceMemUsage { @@ -175,17 +156,18 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter xb.mn = &rs.MetricName xb.timestamps = rs.Timestamps xb.values = rs.Values - writeCSVLine(xb) + if err := writeCSVLine(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) - close(resultsCh) doneCh <- err }() } else { go func() { - err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -195,29 +177,21 @@ func ExportCSVHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter xb := exportBlockPool.Get().(*exportBlock) xb.mn = mn xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) - writeCSVLine(xb) + if err := writeCSVLine(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) - close(resultsCh) doneCh <- err }() } - // Consume all the data from resultsCh. - for bb := range resultsCh { - // Do not check for error in bw.Write, since this error is checked inside netstorage.ExportBlocks above. - _, _ = bw.Write(bb.B) - quicktemplate.ReleaseByteBuffer(bb) - } - if err := bw.Flush(); err != nil { - return err - } err = <-doneCh if err != nil { return fmt.Errorf("error during sending the exported csv data to remote client: %w", err) } - return nil + return sw.flush() } var exportCSVDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/csv"}`) @@ -235,6 +209,7 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri w.Header().Set("Content-Type", "VictoriaMetrics/native") bw := bufferedwriter.Get(w) defer bufferedwriter.Put(bw) + sw := newScalableWriter(bw) // Marshal tr trBuf := make([]byte, 0, 16) @@ -243,13 +218,13 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri _, _ = bw.Write(trBuf) // Marshal native blocks. - err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err = netstorage.ExportBlocks(nil, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } - dstBuf := bbPool.Get() + bb := sw.getBuffer(workerID) + dst := bb.B tmpBuf := bbPool.Get() - dst := dstBuf.B tmp := tmpBuf.B // Marshal mn @@ -265,19 +240,13 @@ func ExportNativeHandler(startTime time.Time, at *auth.Token, w http.ResponseWri tmpBuf.B = tmp bbPool.Put(tmpBuf) - _, err := bw.Write(dst) - - dstBuf.B = dst - bbPool.Put(dstBuf) - return err + bb.B = dst + return sw.maybeFlushBuffer(bb) }) if err != nil { return fmt.Errorf("error during sending native data to remote client: %w", err) } - if err := bw.Flush(); err != nil { - return fmt.Errorf("error during flushing native data to remote client: %w", err) - } - return nil + return sw.flush() } var exportNativeDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export/native"}`) @@ -304,31 +273,48 @@ func ExportHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter, r var exportDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/export"}`) func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter, cp *commonParams, format string, maxRowsPerLine int, reduceMemUsage bool) error { - writeResponseFunc := WriteExportStdResponse - writeLineFunc := func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + sw := newScalableWriter(bw) + writeLineFunc := func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) WriteExportJSONLine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } contentType := "application/stream+json; charset=utf-8" if format == "prometheus" { contentType = "text/plain; charset=utf-8" - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + writeLineFunc = func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) WriteExportPrometheusLine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } } else if format == "promapi" { - writeResponseFunc = WriteExportPromAPIResponse - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { - bb := quicktemplate.AcquireByteBuffer() + WriteExportPromAPIHeader(bw) + firstLineOnce := uint32(0) + firstLineSent := uint32(0) + writeLineFunc = func(xb *exportBlock, workerID uint) error { + bb := sw.getBuffer(workerID) + if atomic.CompareAndSwapUint32(&firstLineOnce, 0, 1) { + // Send the first line to sw.bw + WriteExportPromAPILine(bb, xb) + _, err := sw.bw.Write(bb.B) + bb.Reset() + atomic.StoreUint32(&firstLineSent, 1) + return err + } + for atomic.LoadUint32(&firstLineSent) == 0 { + // Busy wait until the first line is sent to sw.bw + runtime.Gosched() + } + bb.B = append(bb.B, ',') WriteExportPromAPILine(bb, xb) - resultsCh <- bb + return sw.maybeFlushBuffer(bb) } } if maxRowsPerLine > 0 { writeLineFuncOrig := writeLineFunc - writeLineFunc = func(xb *exportBlock, resultsCh chan<- *quicktemplate.ByteBuffer) { + writeLineFunc = func(xb *exportBlock, workerID uint) error { valuesOrig := xb.values timestampsOrig := xb.timestamps values := valuesOrig @@ -349,19 +335,19 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter } xb.values = valuesChunk xb.timestamps = timestampsChunk - writeLineFuncOrig(xb, resultsCh) + if err := writeLineFuncOrig(xb, workerID); err != nil { + return err + } } xb.values = valuesOrig xb.timestamps = timestampsOrig + return nil } } sq := storage.NewSearchQuery(at.AccountID, at.ProjectID, cp.start, cp.end, cp.filterss, *maxExportSeries) w.Header().Set("Content-Type", contentType) - bw := bufferedwriter.Get(w) - defer bufferedwriter.Put(bw) - resultsCh := make(chan *quicktemplate.ByteBuffer, cgroup.AvailableCPUs()) doneCh := make(chan error, 1) if !reduceMemUsage { // Unconditionally deny partial response for the exported data, @@ -381,19 +367,20 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter xb.mn = &rs.MetricName xb.timestamps = rs.Timestamps xb.values = rs.Values - writeLineFunc(xb, resultsCh) + if err := writeLineFunc(xb, workerID); err != nil { + return err + } xb.reset() exportBlockPool.Put(xb) return nil }) qtChild.Done() - close(resultsCh) doneCh <- err }() } else { qtChild := qt.NewChild("background export format=%s", format) go func() { - err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange) error { + err := netstorage.ExportBlocks(qtChild, sq, cp.deadline, func(mn *storage.MetricName, b *storage.Block, tr storage.TimeRange, workerID uint) error { if err := bw.Error(); err != nil { return err } @@ -404,26 +391,30 @@ func exportHandler(qt *querytracer.Tracer, at *auth.Token, w http.ResponseWriter xb.mn = mn xb.timestamps, xb.values = b.AppendRowsWithTimeRangeFilter(xb.timestamps[:0], xb.values[:0], tr) if len(xb.timestamps) > 0 { - writeLineFunc(xb, resultsCh) + if err := writeLineFunc(xb, workerID); err != nil { + return err + } } xb.reset() exportBlockPool.Put(xb) return nil }) qtChild.Done() - close(resultsCh) doneCh <- err }() } - - // writeResponseFunc must consume all the data from resultsCh. - writeResponseFunc(bw, resultsCh, qt) - if err := bw.Flush(); err != nil { - return err - } err := <-doneCh if err != nil { - return fmt.Errorf("error during sending the data to remote client: %w", err) + return fmt.Errorf("cannot send data to remote client: %w", err) + } + if err := sw.flush(); err != nil { + return fmt.Errorf("cannot send data to remote client: %w", err) + } + if format == "promapi" { + WriteExportPromAPIFooter(bw, qt) + } + if err := bw.Flush(); err != nil { + return err } return nil } @@ -1212,3 +1203,41 @@ func getCommonParams(r *http.Request, startTime time.Time, requireNonEmptyMatch } return cp, nil } + +type scalableWriter struct { + bw *bufferedwriter.Writer + m sync.Map +} + +func newScalableWriter(bw *bufferedwriter.Writer) *scalableWriter { + return &scalableWriter{ + bw: bw, + } +} + +func (sw *scalableWriter) getBuffer(workerID uint) *bytesutil.ByteBuffer { + v, ok := sw.m.Load(workerID) + if !ok { + v = &bytesutil.ByteBuffer{} + sw.m.Store(workerID, v) + } + return v.(*bytesutil.ByteBuffer) +} + +func (sw *scalableWriter) maybeFlushBuffer(bb *bytesutil.ByteBuffer) error { + if len(bb.B) < 1024*1024 { + return nil + } + _, err := sw.bw.Write(bb.B) + bb.Reset() + return err +} + +func (sw *scalableWriter) flush() error { + sw.m.Range(func(k, v interface{}) bool { + bb := v.(*bytesutil.ByteBuffer) + _, err := sw.bw.Write(bb.B) + return err == nil + }) + return sw.bw.Flush() +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 6f3ece426..f828e66bc 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,6 +27,7 @@ See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#m * FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): support specifying tenant ids via `vm_account_id` and `vm_project_id` labels. See [these docs](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#multitenancy-via-labels) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2970). * FEATURE: [vmagent](https://docs.victoriametrics.com/vmagent.html): improve [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) performance by up to 3x for non-trivial `regex` values such as `([^:]+):.+`, which can be used for extracting a `host` part from `host:port` label value. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): improve performance by up to 4x for queries containing non-trivial `regex` filters such as `{path=~"/foo/.+|/bar"}`. +* FEATURE: improve performance scalability on systems with many CPU cores for [/federate](https://docs.victoriametrics.com/#federation) and [/api/v1/export/...](https://docs.victoriametrics.com/#how-to-export-time-series) endpoints. * FEATURE: sanitize metric names for data ingested via [DataDog protocol](https://docs.victoriametrics.com/#how-to-send-data-from-datadog-agent) according to [DataDog metric naming](https://docs.datadoghq.com/metrics/custom_metrics/#naming-custom-metrics). The behaviour can be disabled by passing `-datadog.sanitizeMetricName=false` command-line flag. Thanks to @PerGon for [the pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3105). * FEATURE: add `-usePromCompatibleNaming` command-line flag to [vmagent](https://docs.victoriametrics.com/vmagent.html), to single-node VictoriaMetrics and to `vminsert` component of VictoriaMetrics cluster. This flag can be used for normalizing the ingested metric names and label names to [Prometheus-compatible form](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). If this flag is set, then all the chars unsupported by Prometheus are replaced with `_` chars in metric names and labels of the ingested samples. See [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3113). * FEATURE: accept whitespace in metric names and tags ingested via [Graphite plaintext protocol](https://docs.victoriametrics.com/#how-to-send-data-from-graphite-compatible-agents-such-as-statsd) according to [the specs](https://graphite.readthedocs.io/en/latest/tags.html). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3102).