app/vmselect: moves series update logic to vmselect

it should simplify migration and keep good performance for vmstorage component
This commit is contained in:
f41gh7 2023-07-06 08:34:51 +02:00
parent 1ab593f807
commit 23e53bdb80
No known key found for this signature in database
GPG key ID: 4558311CF775EC72
3 changed files with 34 additions and 102 deletions

View file

@ -1644,29 +1644,44 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock,
// Do not intern mb.MetricName, since it leads to increased memory usage. // Do not intern mb.MetricName, since it leads to increased memory usage.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692
metricName := mb.MetricName metricName := mb.MetricName
var generationID int64
mn := storage.GetMetricName()
defer storage.PutMetricName(mn)
if err := mn.Unmarshal(metricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %q %w", metricName, err)
}
generationIDTag := mn.RemoveTagWithResult(`__generation_id`)
if generationIDTag != nil {
generationID, err = strconv.ParseInt(string(generationIDTag.Value), 10, 64)
if err != nil {
return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err)
}
metricName = mn.Marshal(metricName[:0])
}
// process data blocks with metric updates
// TODO profile it, probably it's better to replace mutex with per worker lock-free struct
if generationID > 0 {
tbfw.mu.Lock()
defer tbfw.mu.Unlock()
ups := tbfw.seriesUpdatesByMetricName[string(metricName)]
if ups == nil {
// fast path
tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{generationID: {addr}}
return nil
}
// todo memory optimization for metricNames, use interning?
addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][generationID]
addrs = append(addrs, addr)
tbfw.seriesUpdatesByMetricName[string(metricName)][generationID] = addrs
return nil
}
m := tbfw.ms[workerID] m := tbfw.ms[workerID]
addrs := m[string(metricName)] addrs := m[string(metricName)]
if addrs == nil { if addrs == nil {
addrs = newBlockAddrs() addrs = newBlockAddrs()
} }
addrs.addrs = append(addrs.addrs, addr) addrs.addrs = append(addrs.addrs, addr)
// process data blocks with metric updates
// TODO profile it, probably it's better to replace mutex with per worker lock-free struct
if mb.GenerationID > 0 {
tbfw.mu.Lock()
defer tbfw.mu.Unlock()
ups := tbfw.seriesUpdatesByMetricName[string(metricName)]
if ups == nil {
// fast path
tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{mb.GenerationID: {addr}}
return nil
}
// todo memory optimization for metricNames, use interning?
addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID]
addrs = append(addrs, addr)
tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID] = addrs
return nil
}
if len(addrs.addrs) == 1 { if len(addrs.addrs) == 1 {
// An optimization for big number of time series with long names: store only a single copy of metricNameStr // An optimization for big number of time series with long names: store only a single copy of metricNameStr
// in both tbfw.orderedMetricNamess and tbfw.ms. // in both tbfw.orderedMetricNamess and tbfw.ms.
@ -1735,11 +1750,6 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
return fmt.Errorf("cannot unmarshal metricName: %w", err) return fmt.Errorf("cannot unmarshal metricName: %w", err)
} }
// add generation id label
// it should help user migrate data between instance
if mb.GenerationID > 0 {
mn.AddTag("__generation_id", strconv.FormatInt(mb.GenerationID, 10))
}
if err := f(mn, &mb.Block, tr, workerID); err != nil { if err := f(mn, &mb.Block, tr, workerID); err != nil {
return err return err
} }
@ -2333,7 +2343,7 @@ func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData []
} }
return nil return nil
} }
return sn.execOnConnWithPossibleRetry(qt, "search_v8", f, deadline) return sn.execOnConnWithPossibleRetry(qt, "search_v7", f, deadline)
} }
func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error {

View file

@ -56,23 +56,12 @@ type MetricBlockRef struct {
type MetricBlock struct { type MetricBlock struct {
// MetricName is metric name for the given Block. // MetricName is metric name for the given Block.
MetricName []byte MetricName []byte
// GenerationID unique ID for series update, extracted from Tag __generation_id at runtime
GenerationID int64
// Block is a block for the given MetricName // Block is a block for the given MetricName
Block Block Block Block
} }
// Marshal marshals MetricBlock to dst // Marshal marshals MetricBlock to dst
func (mb *MetricBlock) Marshal(dst []byte) []byte { func (mb *MetricBlock) Marshal(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mb.MetricName)
dst = encoding.MarshalInt64(dst, mb.GenerationID)
return MarshalBlock(dst, &mb.Block)
}
// MarshalV7 marshals MetricBlock to dst at v7 api version
func (mb *MetricBlock) MarshalV7(dst []byte) []byte {
dst = encoding.MarshalBytes(dst, mb.MetricName) dst = encoding.MarshalBytes(dst, mb.MetricName)
return MarshalBlock(dst, &mb.Block) return MarshalBlock(dst, &mb.Block)
} }
@ -102,8 +91,7 @@ func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) {
} }
mb.MetricName = append(mb.MetricName[:0], mn...) mb.MetricName = append(mb.MetricName[:0], mn...)
src = tail src = tail
mb.GenerationID = encoding.UnmarshalInt64(src)
src = src[8:]
return UnmarshalBlock(&mb.Block, src) return UnmarshalBlock(&mb.Block, src)
} }

View file

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -559,8 +558,6 @@ func (s *Server) endConcurrentRequest() {
func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
switch rpcName { switch rpcName {
case "search_v7": case "search_v7":
return s.processSearchV7(ctx)
case "search_v8":
return s.processSearch(ctx) return s.processSearch(ctx)
case "searchMetricNames_v3": case "searchMetricNames_v3":
return s.processSearchMetricNames(ctx) return s.processSearchMetricNames(ctx)
@ -1040,26 +1037,10 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
// Send found blocks to vmselect. // Send found blocks to vmselect.
blocksRead := 0 blocksRead := 0
mn := storage.GetMetricName()
defer storage.PutMetricName(mn)
for bi.NextBlock(&ctx.mb) { for bi.NextBlock(&ctx.mb) {
blocksRead++ blocksRead++
s.metricBlocksRead.Inc() s.metricBlocksRead.Inc()
s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) s.metricRowsRead.Add(ctx.mb.Block.RowsCount())
ctx.mb.GenerationID = 0
mn.Reset()
if err := mn.Unmarshal(ctx.mb.MetricName); err != nil {
return fmt.Errorf("cannot unmarshal metricName: %q %w", ctx.mb.MetricName, err)
}
generationIDTag := mn.RemoveTagWithResult(`__generation_id`)
if generationIDTag != nil {
id, err := strconv.ParseInt(string(generationIDTag.Value), 10, 64)
if err != nil {
return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err)
}
ctx.mb.GenerationID = id
ctx.mb.MetricName = mn.Marshal(ctx.mb.MetricName[:0])
}
ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0]) ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0])
@ -1078,50 +1059,3 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
} }
return nil return nil
} }
func (s *Server) processSearchV7(ctx *vmselectRequestCtx) error {
s.searchRequests.Inc()
// Read request.
if err := ctx.readSearchQuery(); err != nil {
return err
}
// Initiaialize the search.
startTime := time.Now()
bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
return ctx.writeErrorMessage(err)
}
s.indexSearchDuration.UpdateDuration(startTime)
defer bi.MustClose()
// Send empty error message to vmselect.
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send empty error message: %w", err)
}
// Send found blocks to vmselect.
blocksRead := 0
for bi.NextBlock(&ctx.mb) {
blocksRead++
s.metricBlocksRead.Inc()
s.metricRowsRead.Add(ctx.mb.Block.RowsCount())
ctx.dataBuf = ctx.mb.MarshalV7(ctx.dataBuf[:0])
if err := ctx.writeDataBufBytes(); err != nil {
return fmt.Errorf("cannot send MetricBlock: %w", err)
}
}
if err := bi.Error(); err != nil {
return fmt.Errorf("search error: %w", err)
}
ctx.qt.Printf("sent %d blocks to vmselect", blocksRead)
// Send 'end of response' marker
if err := ctx.writeString(""); err != nil {
return fmt.Errorf("cannot send 'end of response' marker")
}
return nil
}