diff --git a/app/vminsert/main.go b/app/vminsert/main.go index 938dc3e0f4..e6c1524634 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -24,6 +24,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheusimport" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/promremotewrite" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/relabel" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/seriesupdate" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" @@ -248,6 +249,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "prometheus/api/v1/update/series": + // todo logging and errors. + if err := seriesupdate.InsertHandler(at, r); err != nil { + httpserver.Errorf(w, r, "%s", err) + return true + } + w.WriteHeader(http.StatusNoContent) + return true case "prometheus/api/v1/import": vmimportRequests.Inc() if err := vmimport.InsertHandler(at, r); err != nil { diff --git a/app/vminsert/seriesupdate/request_handler.go b/app/vminsert/seriesupdate/request_handler.go new file mode 100644 index 0000000000..9af559fda9 --- /dev/null +++ b/app/vminsert/seriesupdate/request_handler.go @@ -0,0 +1,80 @@ +package seriesupdate + +import ( + "net/http" + "strconv" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/vmimport" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" + "github.com/VictoriaMetrics/metrics" +) + +var ( + rowsInserted = metrics.NewCounter(`vm_rows_inserted_total{type="update_series"}`) + rowsTenantInserted = tenantmetrics.NewCounterMap(`vm_tenant_inserted_rows_total{type="update_series"}`) + rowsPerInsert = metrics.NewHistogram(`vm_rows_per_insert{type="update_series"}`) +) + +// Returns local unique generationID. +func generateUniqueGenerationID() []byte { + nextID := time.Now().UnixNano() + return []byte(strconv.FormatInt(nextID, 10)) +} + +// InsertHandler processes `/api/v1/update/series` request. +func InsertHandler(at *auth.Token, req *http.Request) error { + isGzipped := req.Header.Get("Content-Encoding") == "gzip" + return parser.ParseStream(req.Body, isGzipped, func(rows []parser.Row) error { + return insertRows(at, rows) + }) +} + +func insertRows(at *auth.Token, rows []parser.Row) error { + ctx := netstorage.GetInsertCtx() + defer netstorage.PutInsertCtx(ctx) + + ctx.Reset() // This line is required for initializing ctx internals. + rowsTotal := 0 + generationID := generateUniqueGenerationID() + for i := range rows { + r := &rows[i] + rowsTotal += len(r.Values) + ctx.Labels = ctx.Labels[:0] + for j := range r.Tags { + tag := &r.Tags[j] + ctx.AddLabelBytes(tag.Key, tag.Value) + } + + if len(ctx.Labels) == 0 { + // Skip metric without labels. + continue + } + // there is no need in relabeling and extra_label adding + // since modified series already passed this phase during ingestion, + // and it may lead to unexpected result for user. + ctx.AddLabelBytes([]byte(`__generation_id`), generationID) + ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], at.AccountID, at.ProjectID, ctx.Labels) + values := r.Values + timestamps := r.Timestamps + if len(timestamps) != len(values) { + logger.Panicf("BUG: len(timestamps)=%d must match len(values)=%d", len(timestamps), len(values)) + } + atLocal := ctx.GetLocalAuthToken(at) + storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels) + for j, value := range values { + timestamp := timestamps[j] + if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil { + return err + } + } + } + rowsInserted.Add(rowsTotal) + rowsTenantInserted.Get(at).Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ctx.FlushBufs() +} diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 7e51d16610..1e11da36dc 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -10,6 +10,7 @@ import ( "net/http" "os" "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -352,8 +353,134 @@ var ( ) type packedTimeseries struct { - metricName string - addrs []tmpBlockAddr + samples int + metricName string + addrs []tmpBlockAddr + updateAddrs sortedTimeseriesUpdateAddrs +} + +type sortedTimeseriesUpdateAddrs [][]tmpBlockAddr + +// merges dst Result with update result +// may allocate memory +// dst and update must be sorted by Timestamps +// keeps order by Timestamps +func mergeResult(dst, update *Result) { + // ignore timestamps with not enough points + if len(dst.Timestamps) == 0 || len(update.Timestamps) == 0 { + return + } + + firstDstTs := dst.Timestamps[0] + lastDstTs := dst.Timestamps[len(dst.Timestamps)-1] + firstUpdateTs := update.Timestamps[0] + lastUpdateTs := update.Timestamps[len(update.Timestamps)-1] + + // check lower bound + if firstUpdateTs < firstDstTs { + // fast path + pos := position(dst.Timestamps, lastUpdateTs) + if lastUpdateTs == dst.Timestamps[pos] { + pos++ + } + tailTs := dst.Timestamps[pos:] + tailVs := dst.Values[pos:] + + if len(update.Timestamps) <= pos { + // fast path, no need in memory reallocation + dst.Timestamps = append(dst.Timestamps[:0], update.Timestamps...) + dst.Timestamps = append(dst.Timestamps, tailTs...) + dst.Values = append(dst.Values[:0], update.Values...) + dst.Values = append(dst.Values, tailVs...) + return + } + // slow path, reallocate memory + reallocatedLen := len(tailTs) + len(update.Timestamps) + dst.Timestamps = make([]int64, 0, reallocatedLen) + dst.Values = make([]float64, 0, reallocatedLen) + dst.Timestamps = append(dst.Timestamps, update.Timestamps...) + dst.Timestamps = append(dst.Timestamps, tailTs...) + dst.Values = append(dst.Values, update.Values...) + dst.Values = append(dst.Values, tailVs...) + return + } + + // check upper bound + // fast path, memory allocation possible + if lastUpdateTs > lastDstTs { + // no need to check bounds + if firstUpdateTs > firstDstTs { + dst.Timestamps = append(dst.Timestamps, update.Timestamps...) + dst.Values = append(dst.Values, update.Values...) + } + pos := position(dst.Timestamps, firstUpdateTs) + dst.Timestamps = append(dst.Timestamps[:pos], update.Timestamps...) + dst.Values = append(dst.Values[:pos], update.Values...) + return + } + + // changes inside given range + firstPos := position(dst.Timestamps, firstUpdateTs) + lastPos := position(dst.Timestamps, lastUpdateTs) + // corner case last timestamp overlaps + if lastUpdateTs == dst.Timestamps[lastPos] { + lastPos++ + } + headTs := dst.Timestamps[:firstPos] + tailTs := dst.Timestamps[lastPos:] + headVs := dst.Values[:firstPos] + tailValues := dst.Values[lastPos:] + if len(update.Timestamps) <= lastPos { + // fast path, no need to reallocate + dst.Timestamps = append(dst.Timestamps[:0], headTs...) + dst.Timestamps = append(dst.Timestamps, update.Timestamps...) + dst.Timestamps = append(dst.Timestamps, tailTs...) + + dst.Values = append(dst.Values[:0], headVs...) + dst.Values = append(headVs, update.Values...) + dst.Values = append(dst.Values, tailValues...) + return + } + // slow path, allocate new slice and copy values + reallocateLen := len(headTs) + len(update.Timestamps) + len(tailTs) + dst.Timestamps = make([]int64, 0, reallocateLen) + dst.Values = make([]float64, 0, reallocateLen) + + dst.Timestamps = append(dst.Timestamps, headTs...) + dst.Timestamps = append(dst.Timestamps, update.Timestamps...) + dst.Timestamps = append(dst.Timestamps, tailTs...) + + dst.Values = append(dst.Values, headVs...) + dst.Values = append(dst.Values, update.Values...) + dst.Values = append(dst.Values, tailValues...) +} + +// position searches element position at given src with binary search +// copied and modified from sort.SearchInts +// returns safe slice index +func position(src []int64, value int64) int { + // fast path + if len(src) < 1 || src[0] > value { + return 0 + } + // Define f(-1) == false and f(n) == true. + // Invariant: f(i-1) == false, f(j) == true. + i, j := int64(0), int64(len(src)) + for i < j { + h := int64(uint(i+j) >> 1) // avoid overflow when computing h + // i ≤ h < j + // return a[i] >= x + if !(src[h] >= value) { + i = h + 1 // preserves f(i-1) == false + } else { + j = h // preserves f(j) == true + } + } + // i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i. + if len(src) == int(i) { + i-- + } + return int(i) } type unpackWork struct { @@ -459,10 +586,157 @@ func (pts *packedTimeseries) Unpack(dst *Result, tbfs []*tmpBlocksFile, tr stora } dedupInterval := storage.GetDedupInterval() mergeSortBlocks(dst, sbh, dedupInterval) + seriesUpdateSbss, err := pts.unpackUpdateAddrs(tbfs, tr) + // apply updates + if len(seriesUpdateSbss) > 0 { + var updateDst Result + for _, seriesUpdateSbs := range seriesUpdateSbss { + updateDst.reset() + mergeSortBlocks(&updateDst, seriesUpdateSbs, dedupInterval) + mergeResult(dst, &updateDst) + putSortBlocksHeap(seriesUpdateSbs) + } + } putSortBlocksHeap(sbh) return nil } +func (pts *packedTimeseries) unpackUpdateAddrs(tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlocksHeap, error) { + var upwsLen int + for i := range pts.updateAddrs { + upwsLen += len(pts.updateAddrs[i]) + } + if upwsLen == 0 { + // Nothing to do + return nil, nil + } + initUnpackWork := func(upw *unpackWork, addr tmpBlockAddr) { + upw.tbfs = tbfs + upw.addr = addr + upw.tr = tr + } + dsts := make([]*sortBlocksHeap, 0, upwsLen) + samples := pts.samples + if gomaxprocs == 1 || upwsLen < 1000 { + // It is faster to unpack all the data in the current goroutine. + upw := getUnpackWork() + + tmpBlock := getTmpStorageBlock() + var err error + for _, addrBlock := range pts.updateAddrs { + dst := getSortBlocksHeap() + for _, addr := range addrBlock { + initUnpackWork(upw, addr) + upw.unpack(tmpBlock) + if upw.err != nil { + return dsts, upw.err + } + samples += len(upw.sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(upw.sb) + err = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + break + } + dst.sbs = append(dst.sbs, upw.sb) + upw.reset() + } + dsts = append(dsts, dst) + } + putTmpStorageBlock(tmpBlock) + putUnpackWork(upw) + return dsts, err + } + // Slow path - spin up multiple local workers for parallel data unpacking. + // Do not use global workers pool, since it increases inter-CPU memory ping-poing, + // which reduces the scalability on systems with many CPU cores. + + // Prepare the work for workers. + upwss := make([][]*unpackWork, len(pts.updateAddrs)) + var workItems int + for i, addrs := range pts.updateAddrs { + upws := make([]*unpackWork, len(addrs)) + for j, addr := range addrs { + workItems++ + upw := getUnpackWork() + initUnpackWork(upw, addr) + upws[j] = upw + } + + upwss[i] = upws + } + + // Prepare worker channels. + workers := upwsLen + if workers > gomaxprocs { + workers = gomaxprocs + } + if workers < 1 { + workers = 1 + } + itemsPerWorker := (workItems + workers - 1) / workers + workChs := make([]chan *unpackWork, workers) + for i := range workChs { + workChs[i] = make(chan *unpackWork, itemsPerWorker) + } + + // Spread work among worker channels. + for i, upws := range upwss { + idx := i % len(workChs) + for _, upw := range upws { + workChs[idx] <- upw + } + } + // Mark worker channels as closed. + for _, workCh := range workChs { + close(workCh) + } + + // Start workers and wait until they finish the work. + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID uint) { + unpackWorker(workChs, workerID) + wg.Done() + }(uint(i)) + } + wg.Wait() + + // Collect results. + var firstErr error + for _, upws := range upwss { + sbh := getSortBlocksHeap() + for _, upw := range upws { + if upw.err != nil && firstErr == nil { + // Return the first error only, since other errors are likely the same. + firstErr = upw.err + } + if firstErr == nil { + sb := upw.sb + samples += len(sb.Timestamps) + if *maxSamplesPerSeries > 0 && samples > *maxSamplesPerSeries { + putSortBlock(sb) + firstErr = fmt.Errorf("cannot process more than %d samples per series; either increase -search.maxSamplesPerSeries "+ + "or reduce time range for the query", *maxSamplesPerSeries) + } else { + sbh.sbs = append(sbh.sbs, sb) + } + } else { + putSortBlock(upw.sb) + } + putUnpackWork(upw) + } + dsts = append(dsts, sbh) + } + if firstErr != nil { + for _, sbh := range dsts { + putSortBlocksHeap(sbh) + } + } + return dsts, nil +} + func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, tr storage.TimeRange) ([]*sortBlock, error) { upwsLen := len(pts.addrs) if upwsLen == 0 { @@ -574,6 +848,7 @@ func (pts *packedTimeseries) unpackTo(dst []*sortBlock, tbfs []*tmpBlocksFile, t } putUnpackWork(upw) } + pts.samples = samples return dst, firstErr } @@ -1318,6 +1593,12 @@ type tmpBlocksFileWrapper struct { tbfs []*tmpBlocksFile ms []map[string]*blockAddrs orderedMetricNamess [][]string + // mu protects series updates + // it shouldn't cause cpu contention + // usually series updates are small + mu sync.Mutex + // updates grouped by metric name and generation ID + seriesUpdatesByMetricName map[string]map[int64][]tmpBlockAddr } type blockAddrs struct { @@ -1342,9 +1623,10 @@ func newTmpBlocksFileWrapper(sns []*storageNode) *tmpBlocksFileWrapper { ms[i] = make(map[string]*blockAddrs) } return &tmpBlocksFileWrapper{ - tbfs: tbfs, - ms: ms, - orderedMetricNamess: make([][]string, n), + tbfs: tbfs, + ms: ms, + seriesUpdatesByMetricName: make(map[string]map[int64][]tmpBlockAddr), + orderedMetricNamess: make([][]string, n), } } @@ -1365,6 +1647,23 @@ func (tbfw *tmpBlocksFileWrapper) RegisterAndWriteBlock(mb *storage.MetricBlock, addrs = newBlockAddrs() } addrs.addrs = append(addrs.addrs, addr) + // process data blocks with metric updates + // TODO profile it, probably it's better to replace mutex with per worker lock-free struct + if mb.GenerationID > 0 { + tbfw.mu.Lock() + defer tbfw.mu.Unlock() + ups := tbfw.seriesUpdatesByMetricName[string(metricName)] + if ups == nil { + // fast path + tbfw.seriesUpdatesByMetricName[string(metricName)] = map[int64][]tmpBlockAddr{mb.GenerationID: {addr}} + return nil + } + // todo memory optimization for metricNames, use interning? + addrs := tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID] + addrs = append(addrs, addr) + tbfw.seriesUpdatesByMetricName[string(metricName)][mb.GenerationID] = addrs + return nil + } if len(addrs.addrs) == 1 { // An optimization for big number of time series with long names: store only a single copy of metricNameStr // in both tbfw.orderedMetricNamess and tbfw.ms. @@ -1432,6 +1731,12 @@ func ExportBlocks(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear if err := mn.Unmarshal(mb.MetricName); err != nil { return fmt.Errorf("cannot unmarshal metricName: %w", err) } + + // add generation id label + // it should help user migrate data between instance + if mb.GenerationID > 0 { + mn.AddTag("__generation_id", strconv.FormatInt(mb.GenerationID, 10)) + } if err := f(mn, &mb.Block, tr, workerID); err != nil { return err } @@ -1566,9 +1871,25 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st rss.tbfs = tbfw.tbfs pts := make([]packedTimeseries, len(orderedMetricNames)) for i, metricName := range orderedMetricNames { + seriesUpdateGenerations := tbfw.seriesUpdatesByMetricName[metricName] + var stua sortedTimeseriesUpdateAddrs + if len(seriesUpdateGenerations) > 0 { + stua = make(sortedTimeseriesUpdateAddrs, 0, len(seriesUpdateGenerations)) + orderedGenerationIDs := make([]int64, 0, len(seriesUpdateGenerations)) + for generationID := range seriesUpdateGenerations { + orderedGenerationIDs = append(orderedGenerationIDs, generationID) + } + sort.Slice(orderedGenerationIDs, func(i, j int) bool { + return orderedGenerationIDs[i] < orderedGenerationIDs[j] + }) + for _, genID := range orderedGenerationIDs { + stua = append(stua, seriesUpdateGenerations[genID]) + } + } pts[i] = packedTimeseries{ - metricName: metricName, - addrs: addrsByMetricName[metricName].addrs, + metricName: metricName, + addrs: addrsByMetricName[metricName].addrs, + updateAddrs: stua, } } rss.packedTimeseries = pts @@ -2009,7 +2330,7 @@ func (sn *storageNode) processSearchQuery(qt *querytracer.Tracer, requestData [] } return nil } - return sn.execOnConnWithPossibleRetry(qt, "search_v7", f, deadline) + return sn.execOnConnWithPossibleRetry(qt, "search_v8", f, deadline) } func (sn *storageNode) execOnConnWithPossibleRetry(qt *querytracer.Tracer, funcName string, f func(bc *handshake.BufferedConn) error, deadline searchutils.Deadline) error { diff --git a/app/vmselect/netstorage/netstorage_test.go b/app/vmselect/netstorage/netstorage_test.go index 2e3d303fcf..ab4c807ef2 100644 --- a/app/vmselect/netstorage/netstorage_test.go +++ b/app/vmselect/netstorage/netstorage_test.go @@ -5,6 +5,9 @@ import ( "reflect" "runtime" "testing" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" ) func TestInitStopNodes(t *testing.T) { @@ -47,7 +50,6 @@ func TestMergeSortBlocks(t *testing.T) { t.Fatalf("unexpected timestamps;\ngot\n%v\nwant\n%v", result.Timestamps, expectedResult.Timestamps) } } - // Zero blocks f(nil, 1, &Result{}) @@ -221,3 +223,238 @@ func TestMergeSortBlocks(t *testing.T) { Values: []float64{7, 24, 26}, }) } + +func TestMergeResult(t *testing.T) { + f := func(name string, dst, update, expect *Result) { + t.Helper() + t.Run(name, func(t *testing.T) { + mergeResult(dst, update) + if !reflect.DeepEqual(dst, expect) { + t.Fatalf(" unexpected result \ngot: \n%v\nwant: \n%v", dst, expect) + } + }) + } + + f("append and replace", + &Result{Timestamps: []int64{1, 2}, Values: []float64{5.0, 6.0}}, + &Result{Timestamps: []int64{2, 3}, Values: []float64{10.0, 30.0}}, + &Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 10.0, 30.0}}) + f("extend and replace", + &Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}}, + &Result{Timestamps: []int64{0, 1, 2}, Values: []float64{10.0, 15.0, 30.0}}, + &Result{Timestamps: []int64{0, 1, 2, 3}, Values: []float64{10.0, 15.0, 30.0, 7.0}}) + f("update single point", + &Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}}, + &Result{Timestamps: []int64{15}, Values: []float64{35.0}}, + &Result{Timestamps: []int64{1, 2, 3, 15}, Values: []float64{5.0, 6.0, 7.0, 35.0}}) + f("extend", + &Result{Timestamps: []int64{1, 2, 3}, Values: []float64{5.0, 6.0, 7.0}}, + &Result{Timestamps: []int64{6, 7, 8}, Values: []float64{10.0, 15.0, 30.0}}, + &Result{Timestamps: []int64{1, 2, 3, 6, 7, 8}, Values: []float64{5, 6, 7, 10, 15, 30}}) + f("fast path", + &Result{}, + &Result{Timestamps: []int64{1, 2, 3}}, + &Result{}) + f("merge at the middle", + &Result{Timestamps: []int64{1, 2, 5, 6, 10, 15}, Values: []float64{.1, .2, .3, .4, .5, .6}}, + &Result{Timestamps: []int64{2, 6, 9, 10}, Values: []float64{1.1, 1.2, 1.3, 1.4}}, + &Result{Timestamps: []int64{1, 2, 6, 9, 10, 15}, Values: []float64{.1, 1.1, 1.2, 1.3, 1.4, 0.6}}) + + f("merge and re-allocate", + &Result{ + Timestamps: []int64{10, 20, 30, 50, 60, 90}, + Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6}, + }, + &Result{ + Timestamps: []int64{20, 30, 35, 45, 50, 55, 60}, + Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6}, + }, + &Result{ + Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90}, + Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6}, + }) +} + +func TestPackedTimeseries_Unpack(t *testing.T) { + createBlock := func(ts []int64, vs []int64) *storage.Block { + tsid := &storage.TSID{ + MetricID: 234211, + } + scale := int16(0) + precisionBits := uint8(8) + var b storage.Block + b.Init(tsid, ts, vs, scale, precisionBits) + _, _, _ = b.MarshalData(0, 0) + return &b + } + tr := storage.TimeRange{ + MinTimestamp: 0, + MaxTimestamp: 1<<63 - 1, + } + var mn storage.MetricName + mn.MetricGroup = []byte("foobar") + metricName := string(mn.Marshal(nil)) + type blockData struct { + timestamps []int64 + values []int64 + } + isValuesEqual := func(got, want []float64) bool { + equal := true + if len(got) != len(want) { + return false + } + for i, v := range want { + gotV := got[i] + if v == gotV { + continue + } + if decimal.IsStaleNaN(v) && decimal.IsStaleNaN(gotV) { + continue + } + equal = false + } + return equal + } + f := func(name string, dataBlocks []blockData, updateBlocks []blockData, wantResult *Result) { + t.Run(name, func(t *testing.T) { + + pts := packedTimeseries{ + metricName: metricName, + } + var dst Result + tbf := tmpBlocksFile{ + buf: make([]byte, 0, 20*1024*1024), + } + for _, dataBlock := range dataBlocks { + bb := createBlock(dataBlock.timestamps, dataBlock.values) + addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0) + if err != nil { + t.Fatalf("cannot write block: %s", err) + } + pts.addrs = append(pts.addrs, addr) + } + var updateAddrs []tmpBlockAddr + for _, updateBlock := range updateBlocks { + bb := createBlock(updateBlock.timestamps, updateBlock.values) + addr, err := tbf.WriteBlockData(storage.MarshalBlock(nil, bb), 0) + if err != nil { + t.Fatalf("cannot write update block: %s", err) + } + updateAddrs = append(updateAddrs, addr) + } + if len(updateAddrs) > 0 { + pts.updateAddrs = append(pts.updateAddrs, updateAddrs) + } + + if err := pts.Unpack(&dst, []*tmpBlocksFile{&tbf}, tr); err != nil { + t.Fatalf("unexpected error at series unpack: %s", err) + } + if !reflect.DeepEqual(wantResult, &dst) && !isValuesEqual(wantResult.Values, dst.Values) { + t.Fatalf("unexpected result for unpack \nwant: \n%v\ngot: \n%v\n", wantResult, &dst) + } + }) + } + f("2 blocks without updates", + []blockData{ + { + timestamps: []int64{10, 15, 30}, + values: []int64{1, 2, 3}, + }, + { + timestamps: []int64{35, 40, 45}, + values: []int64{4, 5, 6}, + }, + }, + nil, + &Result{ + MetricName: mn, + Values: []float64{1, 2, 3, 4, 5, 6}, + Timestamps: []int64{10, 15, 30, 35, 40, 45}, + }) + f("2 blocks at the border of time range", + []blockData{ + { + timestamps: []int64{10, 15, 30}, + values: []int64{1, 2, 3}, + }, + { + timestamps: []int64{35, 40, 45}, + values: []int64{4, 5, 6}, + }, + }, + []blockData{ + { + timestamps: []int64{10}, + values: []int64{16}, + }, + }, + &Result{ + MetricName: mn, + Values: []float64{16, 2, 3, 4, 5, 6}, + Timestamps: []int64{10, 15, 30, 35, 40, 45}, + }) + f("2 blocks with update", + []blockData{ + { + timestamps: []int64{10, 15, 30}, + values: []int64{1, 2, 3}, + }, + { + timestamps: []int64{35, 40, 45}, + values: []int64{4, 5, 6}, + }, + }, + []blockData{ + { + timestamps: []int64{15, 30}, + values: []int64{11, 12}, + }, + }, + &Result{ + MetricName: mn, + Values: []float64{1, 11, 12, 4, 5, 6}, + Timestamps: []int64{10, 15, 30, 35, 40, 45}, + }) + f("2 blocks with 2 update blocks", + []blockData{ + { + timestamps: []int64{10, 15, 30}, + values: []int64{1, 2, 3}, + }, + { + timestamps: []int64{35, 40, 65}, + values: []int64{4, 5, 6}, + }, + }, + []blockData{ + { + timestamps: []int64{15, 30}, + values: []int64{11, 12}, + }, + { + timestamps: []int64{45, 55}, + values: []int64{21, 22}, + }, + }, + &Result{ + MetricName: mn, + Values: []float64{1, 11, 12, 21, 22, 6}, + Timestamps: []int64{10, 15, 30, 45, 55, 65}, + }) +} + +func TestPosition(t *testing.T) { + f := func(src []int64, value, wantPosition int64) { + t.Helper() + gotPos := position(src, value) + if wantPosition != int64(gotPos) { + t.Fatalf("incorrect position: \ngot:\n%d\nwant: \n%d", gotPos, wantPosition) + } + _ = src[int64(gotPos)] + } + f([]int64{1, 2, 3, 4}, 5, 3) + f([]int64{1, 2, 3, 4}, 0, 0) + f([]int64{1, 2, 3, 4}, 1, 0) + f([]int64{1, 2, 3, 4}, 4, 3) + f([]int64{1, 2, 3, 4}, 3, 2) +} diff --git a/app/vmselect/netstorage/netstorage_timing_test.go b/app/vmselect/netstorage/netstorage_timing_test.go index 2410b96663..27de5e1666 100644 --- a/app/vmselect/netstorage/netstorage_timing_test.go +++ b/app/vmselect/netstorage/netstorage_timing_test.go @@ -2,6 +2,7 @@ package netstorage import ( "fmt" + "reflect" "testing" ) @@ -105,3 +106,80 @@ func benchmarkMergeSortBlocks(b *testing.B, blocks []*sortBlock) { } }) } + +func BenchmarkMergeResults(b *testing.B) { + b.ReportAllocs() + f := func(name string, dst, update, expect *Result) { + if len(dst.Timestamps) != len(dst.Values) { + b.Fatalf("bad input data, timestamps and values lens must match") + } + if len(update.Values) != len(update.Timestamps) { + b.Fatalf("bad input data, update timestamp and values must match") + } + var toMerge Result + b.Run(name, func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + toMerge.reset() + toMerge.Values = append(toMerge.Values, dst.Values...) + toMerge.Timestamps = append(toMerge.Timestamps, dst.Timestamps...) + mergeResult(&toMerge, update) + if !reflect.DeepEqual(&toMerge, expect) { + b.Fatalf("unexpected result, got: \n%v\nwant: \n%v", &toMerge, expect) + } + } + }) + } + f("update at the start", + &Result{ + Timestamps: []int64{10, 20, 30, 40, 50, 60, 90}, + Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9}, + }, + &Result{ + Timestamps: []int64{0, 20, 40}, + Values: []float64{0.0, 5.2, 5.4}, + }, + &Result{ + Timestamps: []int64{0, 20, 40, 50, 60, 90}, + Values: []float64{0.0, 5.2, 5.4, 2.5, 2.6, 2.9}, + }) + f("update at the end", + &Result{ + Timestamps: []int64{10, 20, 30, 40, 50, 60, 90}, + Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9}, + }, + &Result{ + Timestamps: []int64{50, 70, 100}, + Values: []float64{0.0, 5.7, 5.1}, + }, + &Result{ + Timestamps: []int64{10, 20, 30, 40, 50, 70, 100}, + Values: []float64{2.1, 2.2, 2.3, 2.4, 0.0, 5.7, 5.1}, + }) + f("update at the middle", + &Result{ + Timestamps: []int64{10, 20, 30, 40, 50, 60, 90}, + Values: []float64{2.1, 2.2, 2.3, 2.4, 2.5, 2.6, 2.9}, + }, + &Result{ + Timestamps: []int64{30, 40, 50, 60}, + Values: []float64{5.3, 5.4, 5.5, 5.6}, + }, + &Result{ + Timestamps: []int64{10, 20, 30, 40, 50, 60, 90}, + Values: []float64{2.1, 2.2, 5.3, 5.4, 5.5, 5.6, 2.9}, + }) + f("merge and re-allocate", + &Result{ + Timestamps: []int64{10, 20, 30, 50, 60, 90}, + Values: []float64{1.1, 1.2, 1.3, 1.4, 1.5, 1.6}, + }, + &Result{ + Timestamps: []int64{20, 30, 35, 45, 50, 55, 60}, + Values: []float64{2.0, 2.3, 2.35, 2.45, 2.5, 2.55, 2.6}, + }, + &Result{ + Timestamps: []int64{10, 20, 30, 35, 45, 50, 55, 60, 90}, + Values: []float64{1.1, 2.0, 2.3, 2.35, 2.45, 2.50, 2.55, 2.6, 1.6}, + }) +} diff --git a/app/vmselect/netstorage/upds.json b/app/vmselect/netstorage/upds.json new file mode 100644 index 0000000000..d204df4076 --- /dev/null +++ b/app/vmselect/netstorage/upds.json @@ -0,0 +1 @@ +{"metric":{"__name__":"promhttp_metric_handler_requests_total","job":"node_exporter-6","up":"true"},"values":[131],"timestamps":[1676050756785]} diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index 2ad4e854ee..4fa9fbfa0b 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -684,6 +684,48 @@ By default, `vminsert` tries to route all the samples for a single time series t * when `vmstorage` nodes are temporarily unavailable (for instance, during their restart). Then new samples are re-routed to the remaining available `vmstorage` nodes; * when `vmstorage` node has no enough capacity for processing incoming data stream. Then `vminsert` re-routes new samples to other `vmstorage` nodes. +## Alter/Update series + + VictoriaMetrics supports data modification and update with following limitations: +- modified data cannot be changed with back-filling. +- modified data must be sent to `vminsert` component. +- only json-line format is supported. + + How it works: +* Export series for modification with `/api/v1/export/prometheus` from `vmselect`. +* Modify values,timestamps with needed values. You can delete, add or modify timestamps and values. +* Send modified series to the `vminsert`'s API `/prometheus/api/v1/update/series` with POST request. +* `vminsert` adds unique monotonically increasing `__generation_id` label to each series during the update, so their samples could replace the original samples. +* at query requests `vmselect` merges original series with series updates sequentially according to their `__generation_id`. + + How vmselect merges updates: +* example 1 - delete timestamps at time range + original series timestamps: [10,20,30,40,50] values: [1,2,3,4,5] + modified series timestamps: [20,50] values: [2,5] + query result series timestamps: [10,20,50] values: [1,2,5] +* example 2 - modify values + origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5] + modified series timestamps: [20,30,40] values: [22,33,44] + query result series timestamps: [10,20,30,40,50] values: [1,22,33,44,5] +* example 3 - modify timestamps + origin series timestamps: [10,20,30,40,50] values: [1,2,3,4,5] + modified series timestamps: [0,5,10,15,20,25,30] values: [0.1,0.5,1,1.5,2,2.5,30] + modified series timestamps: [0,5,10,15,20,25,30,40,50] values: [0.1,0.5,1,1.5,2,2.5,30,4,5] + + How to check which series were modified: +* execute metrics export request with following params query params: + * `reduce_mem_usage=true` + * `extra_filter={__generation_id!=""}` + Example request +```bash +curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type="datadog"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}' +``` + example output: +```json +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429","__generation_id":"1658153678907548000"},"values":[0,0,0,0,256,253,135,15],"timestamps":[1658153559703,1658153569703,1658153579703,1658153589703,1658153599703,1658153609703,1658153619703,1658153621703]} +``` + There was single series modify request with `__generation_id="1658153678907548000"` + ## Backups diff --git a/docs/guides/modify-series.md b/docs/guides/modify-series.md new file mode 100644 index 0000000000..ac3ca16595 --- /dev/null +++ b/docs/guides/modify-series.md @@ -0,0 +1,138 @@ +# Series modification + +## Scenario + +VictoriaMetrics doesn't support direct data modification, since it uses immutable data structures and such operations may significantly reduce system performance. + +The new series update API should provide a workaround for this issue. API allows overriding existing Timeseries data points at runtime during select requests. + +Following operations supported: +- add data points. +- remove data points. +- modify data points. + + +Note this is a low-level feature, data modification could be done with scripts, vmctl, or `VMUI` in future releases. + +## Examples + + +### Setup env + +It's expected, that you have configured VictoriaMetrics cluster, vminsert, and vmselect components reachable from your computer. + +I'll work with the following data set, which was exported with a call to the export API: +```text +curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'start=1658164769' -d 'end=1658165291' + +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"native","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"graphite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"csvimport","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"promremotewrite","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"datadog","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +``` + +For better usability, it could be exported to a file on disk and modified via a preferred text editor. + + +### Modify data points + +#### change values + +Let's say, during ingestion some error happened and producer incorrectly ingest value `0` for timestamp 1658164969982 and `Prometheus` and `influx` types: +```text +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164969982,1658164979982,1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165209982,1658165219982,1658165229982,1658165239982,1658165249982,1658165259982,1658165261982]} +``` + +we have to modify these values to correct `0` and send update request to the `vminsert` API: +```text +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0],"timestamps":[1658164969982]} +``` + +data points could be also updated at time range if actual timestamp is not known. For instance, [1658164969972,1658164969982,1658164969992] timestamp range overwrite values for given timestamps and drops any timestamps at a given time range. + +Save 2 series above into the file `incorrect_value_modification.txt` and execute API request with the curl command: +```text +curl localhost:8480/insert/0/prometheus/api/v1/update/series -T incorrect_values_modification.txt +``` + +Check series modification output: +```text +curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"prometheus|influx"}' -d 'start=1658164969' -d 'end=1658164989' + +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0],"timestamps":[1658164969982,1658164979982,1658164987982]} +``` + +#### Add missing timestamps + +Missing timestamps could be added in the same way, specify needed timestamps with needed values at correct array indexes. + +### Delete data points at time range + +For example data set we have following time range from `1658164969982` to `1658165261982`. +Data points inside time range can be removed by skipping timestamps and time range, which must be removed. +For example, if timestamps from `1658164999982` until `1658165099982` must be removed, skip all timestamps between it: +```text +# exclude variant +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165099982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0,0,0],"timestamps":[1658164989982,16581649999821658165099982,1658165109982]} + +# include variant +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +``` + +saved on of variants into the file `delete_datapoints_range.txt` and execute following request to the API: +```text +curl localhost:8480/insert/0/prometheus/api/v1/update/series -T delete_datapoints_range.txt +``` + +Check output: +```text + curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209' +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]} +``` + +As you see, series with `opentsdb` type has less data points than `influx`, since data was deleted at time range. + +### Observing changes + +Changes could de check by export api request with special query params `reduce_mem_usage=true` and `extra_filters={__generation_id!=""}`. + +Let's observe changes from previous steps: +```text +curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total"}' -d 'reduce_mem_usage=true' -d 'extra_filters={__generation_id!=""}' + +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"prometheus","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429","__generation_id":"1658166029893830000"},"values":[0,0],"timestamps":[1658164969982,1658164979982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"vmimport","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdbhttp","instance":"localhost:8429","__generation_id":"1658167040791371000"},"values":[0,0],"timestamps":[1658164989982,1658165109982]} +``` + + +### Rollback update operations + +Changes could be undone with metrics DELETE API, you have to specify correct `__generation_id`. +For example, rollback timestamps delete: +```text +curl http://localhost:8481/delete/0/prometheus/api/v1/admin/tsdb/delete_series -g -d 'match={__generation_id="1658167040791371000"}' +``` + +Check that changes were rolled back: + ```text +curl localhost:8481/select/0/prometheus/api/v1/export -g -d 'match[]={__name__="vmagent_rows_inserted_total",type=~"influx|opentsdb"}' -d 'start=1658164989' -d 'end=1658165209' +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"influx","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]} +{"metric":{"__name__":"vmagent_rows_inserted_total","job":"vminsert","type":"opentsdb","instance":"localhost:8429"},"values":[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0],"timestamps":[1658164989982,1658164999982,1658165009982,1658165019982,1658165029982,1658165039982,1658165049982,1658165059982,1658165069982,1658165079982,1658165089982,1658165099982,1658165109982,1658165119982,1658165129982,1658165139982,1658165149982,1658165159982,1658165169982,1658165179982,1658165189982,1658165199982,1658165207982]} +``` \ No newline at end of file diff --git a/lib/storage/metric_name.go b/lib/storage/metric_name.go index 39fc337282..b21d805faa 100644 --- a/lib/storage/metric_name.go +++ b/lib/storage/metric_name.go @@ -284,6 +284,28 @@ func (mn *MetricName) RemoveTag(tagKey string) { } } +// RemoveTagWithResult removes a tag with the given tagKey and returns removed Tag +func (mn *MetricName) RemoveTagWithResult(tagKey string) *Tag { + if tagKey == "__name__" { + mn.ResetMetricGroup() + return nil + } + tags := mn.Tags + mn.Tags = mn.Tags[:0] + var foundTag *Tag + for i := range tags { + tag := &tags[i] + if string(tag.Key) != tagKey { + mn.AddTagBytes(tag.Key, tag.Value) + continue + } + var t Tag + t.copyFrom(tag) + foundTag = &t + } + return foundTag +} + // RemoveTagsIgnoring removes all the tags included in ignoringTags. func (mn *MetricName) RemoveTagsIgnoring(ignoringTags []string) { if len(ignoringTags) == 0 { diff --git a/lib/storage/search.go b/lib/storage/search.go index 61c42e5e66..9c2c365268 100644 --- a/lib/storage/search.go +++ b/lib/storage/search.go @@ -57,12 +57,22 @@ type MetricBlock struct { // MetricName is metric name for the given Block. MetricName []byte + // GenerationID unique ID for series update, extracted from Tag __generation_id at runtime + GenerationID int64 + // Block is a block for the given MetricName Block Block } // Marshal marshals MetricBlock to dst func (mb *MetricBlock) Marshal(dst []byte) []byte { + dst = encoding.MarshalBytes(dst, mb.MetricName) + dst = encoding.MarshalInt64(dst, mb.GenerationID) + return MarshalBlock(dst, &mb.Block) +} + +// MarshalV7 marshals MetricBlock to dst at v7 api version +func (mb *MetricBlock) MarshalV7(dst []byte) []byte { dst = encoding.MarshalBytes(dst, mb.MetricName) return MarshalBlock(dst, &mb.Block) } @@ -92,7 +102,8 @@ func (mb *MetricBlock) Unmarshal(src []byte) ([]byte, error) { } mb.MetricName = append(mb.MetricName[:0], mn...) src = tail - + mb.GenerationID = encoding.UnmarshalInt64(src) + src = src[8:] return UnmarshalBlock(&mb.Block, src) } diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index eb555ad7e9..b25f83e170 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "strconv" "strings" "sync" "sync/atomic" @@ -558,6 +559,8 @@ func (s *Server) endConcurrentRequest() { func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { switch rpcName { case "search_v7": + return s.processSearchV7(ctx) + case "search_v8": return s.processSearch(ctx) case "searchMetricNames_v3": return s.processSearchMetricNames(ctx) @@ -1037,12 +1040,76 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { // Send found blocks to vmselect. blocksRead := 0 + mn := storage.GetMetricName() + defer storage.PutMetricName(mn) for bi.NextBlock(&ctx.mb) { blocksRead++ s.metricBlocksRead.Inc() s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) + ctx.mb.GenerationID = 0 + mn.Reset() + if err := mn.Unmarshal(ctx.mb.MetricName); err != nil { + return fmt.Errorf("cannot unmarshal metricName: %q %w", ctx.mb.MetricName, err) + } + generationIDTag := mn.RemoveTagWithResult(`__generation_id`) + if generationIDTag != nil { + id, err := strconv.ParseInt(string(generationIDTag.Value), 10, 64) + if err != nil { + return fmt.Errorf("cannot parse __generation_id label value: %s : %w", generationIDTag.Value, err) + } + ctx.mb.GenerationID = id + ctx.mb.MetricName = mn.Marshal(ctx.mb.MetricName[:0]) + } ctx.dataBuf = ctx.mb.Marshal(ctx.dataBuf[:0]) + + if err := ctx.writeDataBufBytes(); err != nil { + return fmt.Errorf("cannot send MetricBlock: %w", err) + } + } + if err := bi.Error(); err != nil { + return fmt.Errorf("search error: %w", err) + } + ctx.qt.Printf("sent %d blocks to vmselect", blocksRead) + + // Send 'end of response' marker + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send 'end of response' marker") + } + return nil +} + +func (s *Server) processSearchV7(ctx *vmselectRequestCtx) error { + s.searchRequests.Inc() + + // Read request. + if err := ctx.readSearchQuery(); err != nil { + return err + } + + // Initiaialize the search. + startTime := time.Now() + bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline) + if err != nil { + return ctx.writeErrorMessage(err) + } + s.indexSearchDuration.UpdateDuration(startTime) + defer bi.MustClose() + + // Send empty error message to vmselect. + if err := ctx.writeString(""); err != nil { + return fmt.Errorf("cannot send empty error message: %w", err) + } + + // Send found blocks to vmselect. + blocksRead := 0 + for bi.NextBlock(&ctx.mb) { + blocksRead++ + s.metricBlocksRead.Inc() + s.metricRowsRead.Add(ctx.mb.Block.RowsCount()) + + ctx.dataBuf = ctx.mb.MarshalV7(ctx.dataBuf[:0]) + if err := ctx.writeDataBufBytes(); err != nil { return fmt.Errorf("cannot send MetricBlock: %w", err) }