mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-20 15:16:42 +00:00
app/vmselect/prometheus: improve scalability of /federate
endpoint on systems with many CPU cores
Minimize usage of global lock inside bufferedwriter.Write() when processing `/federate` data on systems with many CPU cores
This commit is contained in:
parent
fb1cc3cc94
commit
49311ae977
1 changed files with 16 additions and 2 deletions
|
@ -84,19 +84,33 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
|
||||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||||
bw := bufferedwriter.Get(w)
|
bw := bufferedwriter.Get(w)
|
||||||
defer bufferedwriter.Put(bw)
|
defer bufferedwriter.Put(bw)
|
||||||
|
var m sync.Map
|
||||||
err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error {
|
err = rss.RunParallel(nil, func(rs *netstorage.Result, workerID uint) error {
|
||||||
if err := bw.Error(); err != nil {
|
if err := bw.Error(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
bb := quicktemplate.AcquireByteBuffer()
|
v, ok := m.Load(workerID)
|
||||||
|
if !ok {
|
||||||
|
v = &bytesutil.ByteBuffer{}
|
||||||
|
m.Store(workerID, v)
|
||||||
|
}
|
||||||
|
bb := v.(*bytesutil.ByteBuffer)
|
||||||
WriteFederate(bb, rs)
|
WriteFederate(bb, rs)
|
||||||
|
if len(bb.B) < 1024*1024 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
_, err := bw.Write(bb.B)
|
_, err := bw.Write(bb.B)
|
||||||
quicktemplate.ReleaseByteBuffer(bb)
|
bb.Reset()
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error during sending data to remote client: %w", err)
|
return fmt.Errorf("error during sending data to remote client: %w", err)
|
||||||
}
|
}
|
||||||
|
m.Range(func(k, v interface{}) bool {
|
||||||
|
bb := v.(*bytesutil.ByteBuffer)
|
||||||
|
_, err := bw.Write(bb.B)
|
||||||
|
return err == nil
|
||||||
|
})
|
||||||
if err := bw.Flush(); err != nil {
|
if err := bw.Flush(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue