diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index b7002810b..7aa6e2974 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1644,29 +1644,44 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, // Do not intern mb.MetricName, since it leads to increased memory usage. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692 metricName := mb.MetricName + var generationID int64 + + mn := storage.GetMetricName() + defer storage.PutMetricName(mn) + if err := mn.Unmarshal(metricName); err != nil { + return fmt.Errorf("cannot unmarshal metricName: %q %w", metricName, err) + } + generationIDTag := mn.RemoveTagWithResult(`__generation_id`) + if generationIDTag != nil { + generationID, err = strconv.ParseInt(string(generationIDTag.Value), 10, 64) + if err != nil { + return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err) + } + metricName = mn.Marshal(metricName[:0]) + } + // process data blocks with metric updates + // TODO profile it, probably it's better to replace mutex with per worker lock-free struct + if generationID > 0 { + tbfw.mu.Lock() + defer tbfw.mu.Unlock() + ups := tbfw.seriesUpdatesByMetricName[string(metricName)] + if ups == nil { + // fast path + tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{generationID: {addr}} + return nil + } + // todo memory optimization for metricNames, use interning? + addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][generationID] + addrs = append(addrs, addr) + tbfw.seriesUpdatesByMetricName[string(metricName)][generationID] = addrs + return nil + } m := tbfw.ms[workerID] addrs := m[string(metricName)] if addrs == nil { addrs = newBlockAddrs() } addrs.addrs = append(addrs.addrs, addr) - // process data blocks with metric updates - // TODO profile it, probably it's better to replace mutex with per worker lock-free struct - if mb.GenerationID > 0 { - tbfw.mu.Lock() - defer tbfw.mu.Unlock() - ups := tbfw.seriesUpdatesByMetricName[string(metricName)] - if ups == nil { - // fast path - tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{mb.GenerationID: {addr}} - return nil - } - // todo memory optimization for metricNames, use interning? - addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID] - addrs = append(addrs, addr) - tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID] = addrs - return nil - } if len(addrs.addrs) == 1 { // An optimization for big number of time series with long names: store only a single copy of metricNameStr // in both tbfw.orderedMetricNamess and tbfw.ms. @@ -1735,11 +1750,6 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear return fmt.Errorf("cannot unmarshal metricName: %w", err) } - // add generation id label - // it should help user migrate data between instance - if mb.GenerationID > 0 { - mn.AddTag("__generation_id", strconv.FormatInt(mb.GenerationID, 10)) - } if err := f(mn, &mb.Block, tr, workerID); err != nil { return err } @@ -2333,7 +2343,7 @@ func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData [] } return nil } - return sn.execOnConnWithPossibleRetry(qt, "search_v8", f, deadline) + return sn.execOnConnWithPossibleRetry(qt, "search_v7", f, deadline) } func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { diff --git a/lib/storage/search.go b/lib/storage/search.go index 9c2c36526..5dc23cd74 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -56,23 +56,12 @@ type MetricBlockRef struct { type MetricBlock struct { // MetricName is metric name for the given Block. MetricName []byte - - // GenerationID unique ID for series update, extracted from Tag __generation_id at runtime - GenerationID int64 - // Block is a block for the given MetricName Block Block } // Marshal marshals MetricBlock to dst func (mb *MetricBlock) Marshal(dst []byte) []byte { - dst = encoding.MarshalBytes(dst, mb.MetricName) - dst = encoding.MarshalInt64(dst, mb.GenerationID) - return MarshalBlock(dst, &mb.Block) -} - -// MarshalV7 marshals MetricBlock to dst at v7 api version -func (mb *MetricBlock) MarshalV7(dst []byte) []byte { dst = encoding.MarshalBytes(dst, mb.MetricName) return MarshalBlock(dst, &mb.Block) } @@ -102,8 +91,7 @@ func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) { } mb.MetricName = append(mb.MetricName[:0], mn...) src = tail - mb.GenerationID = encoding.UnmarshalInt64(src) - src = src[8:] + return UnmarshalBlock(&mb.Block, src) } diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index b25f83e17..8fd38c3f2 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "net" - "strconv" "strings" "sync" "sync/atomic" @@ -559,8 +558,6 @@ func (s *Server) endConcurrentRequest() { func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { switch rpcName { case "search_v7": - return s.processSearchV7(ctx) - case "search_v8": return s.processSearch(ctx) case "searchMetricNames_v3": return s.processSearchMetricNames(ctx) @@ -1040,26 +1037,10 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { // Send found blocks to vmselect. blocksRead := 0 - mn := storage.GetMetricName() - defer storage.PutMetricName(mn) for bi.NextBlock(&ctx.mb) { blocksRead++ s.metricBlocksRead.Inc() s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) - ctx.mb.GenerationID = 0 - mn.Reset() - if err := mn.Unmarshal(ctx.mb.MetricName); err != nil { - return fmt.Errorf("cannot unmarshal metricName: %q %w", ctx.mb.MetricName, err) - } - generationIDTag := mn.RemoveTagWithResult(`__generation_id`) - if generationIDTag != nil { - id, err := strconv.ParseInt(string(generationIDTag.Value), 10, 64) - if err != nil { - return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err) - } - ctx.mb.GenerationID = id - ctx.mb.MetricName = mn.Marshal(ctx.mb.MetricName[:0]) - } ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0]) @@ -1078,50 +1059,3 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { } return nil } - -func (s *Server) processSearchV7(ctx *vmselectRequestCtx) error { - s.searchRequests.Inc() - - // Read request. - if err := ctx.readSearchQuery(); err != nil { - return err - } - - // Initiaialize the search. - startTime := time.Now() - bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline) - if err != nil { - return ctx.writeErrorMessage(err) - } - s.indexSearchDuration.UpdateDuration(startTime) - defer bi.MustClose() - - // Send empty error message to vmselect. - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send empty error message: %w", err) - } - - // Send found blocks to vmselect. - blocksRead := 0 - for bi.NextBlock(&ctx.mb) { - blocksRead++ - s.metricBlocksRead.Inc() - s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) - - ctx.dataBuf = ctx.mb.MarshalV7(ctx.dataBuf[:0]) - - if err := ctx.writeDataBufBytes(); err != nil { - return fmt.Errorf("cannot send MetricBlock: %w", err) - } - } - if err := bi.Error(); err != nil { - return fmt.Errorf("search error: %w", err) - } - ctx.qt.Printf("sent %d blocks to vmselect", blocksRead) - - // Send 'end of response' marker - if err := ctx.writeString(""); err != nil { - return fmt.Errorf("cannot send 'end of response' marker") - } - return nil -}