Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-04-05 19:24:37 +03:00
commit 40112df441
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
8 changed files with 97 additions and 31 deletions

View file

@ -30,8 +30,9 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
if err != nil {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(at, block, extraLabels)
})
})

View file

@ -2,18 +2,17 @@
VictoriaMetrics command-line tool
vmctl provides various useful actions with VictoriaMetrics components.
Features:
- migrate data from [Prometheus](#migrating-data-from-prometheus) to VictoriaMetrics using snapshot API
- migrate data from [Thanos](#migrating-data-from-thanos) to VictoriaMetrics
- migrate data from [InfluxDB](#migrating-data-from-influxdb-1x) to VictoriaMetrics
- migrate data from [OpenTSDB](#migrating-data-from-opentsdb) to VictoriaMetrics
- migrate data between [VictoriaMetrics](#migrating-data-from-victoriametrics) single or cluster version.
- [verify](#verifying-exported-blocks-from-victoriametrics) exported blocks from VictoriaMetrics single or cluster version.
- [x] Prometheus: migrate data from Prometheus to VictoriaMetrics using snapshot API
- [x] Thanos: migrate data from Thanos to VictoriaMetrics
- [ ] ~~Prometheus: migrate data from Prometheus to VictoriaMetrics by query~~(discarded)
- [x] InfluxDB: migrate data from InfluxDB to VictoriaMetrics
- [x] OpenTSDB: migrate data from OpenTSDB to VictoriaMetrics
- [ ] Storage Management: data re-balancing between nodes
vmctl acts as a proxy between data source ([Prometheus](#migrating-data-from-prometheus),
[InfluxDB](#migrating-data-from-influxdb-1x), [VictoriaMetrics](##migrating-data-from-victoriametrics), etc.)
and destination - VictoriaMetrics single or cluster version. To see the full list of supported modes
To see the full list of supported modes
run the following command:
```bash
@ -29,6 +28,7 @@ COMMANDS:
influx Migrate timeseries from InfluxDB
prometheus Migrate timeseries from Prometheus
vm-native Migrate time series between VictoriaMetrics installations via native binary format
verify-block Verifies correctness of data blocks exported via VictoriaMetrics Native format. See https://docs.victoriametrics.com/#how-to-export-data-in-native-format
```
Each mode has its own unique set of flags specific (e.g. prefixed with `influx` for influx mode)
@ -508,6 +508,21 @@ Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/
4. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
and specify `accountID` param.
## Verifying exported blocks from VictoriaMetrics
In this mode, `vmctl` allows verifying correctness and integrity of data exported via [native format](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-export-data-in-native-format) from VictoriaMetrics.
You can verify exported data at disk before uploading it by `vmctl verify-block` command:
```bash
# export blocks from VictoriaMetrics
curl localhost:8428/api/v1/export/native -g -d 'match[]={__name__!=""}' -o exported_data_block
# verify block content
./vmctl verify-block exported_data_block
2022/03/30 18:04:50 verifying block at path="exported_data_block"
2022/03/30 18:04:50 successfully verified block at path="exported_data_block", blockCount=123786
2022/03/30 18:04:50 Total time: 100.108ms
```
## Tuning
### InfluxDB mode

View file

@ -6,6 +6,7 @@ import (
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
@ -14,6 +15,8 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmctl/vm"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/native"
"github.com/urfave/cli/v2"
)
@ -164,6 +167,39 @@ func main() {
return p.run()
},
},
{
Name: "verify-block",
Usage: "Verifies exported block with VictoriaMetrics Native format",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "gunzip",
Usage: "Use GNU zip decompression for exported block",
Value: false,
},
},
Action: func(c *cli.Context) error {
common.StartUnmarshalWorkers()
blockPath := c.Args().First()
isBlockGzipped := c.Bool("gunzip")
if len(blockPath) == 0 {
return cli.Exit("you must provide path for exported data block", 1)
}
log.Printf("verifying block at path=%q", blockPath)
f, err := os.OpenFile(blockPath, os.O_RDONLY, 0600)
if err != nil {
return cli.Exit(fmt.Errorf("cannot open exported block at path=%q err=%w", blockPath, err), 1)
}
var blocksCount uint64
if err := parser.ParseStream(f, isBlockGzipped, func(block *parser.Block) error {
atomic.AddUint64(&blocksCount, 1)
return nil
}); err != nil {
return cli.Exit(fmt.Errorf("cannot parse block at path=%q, blocksCount=%d, err=%w", blockPath, blocksCount, err), 1)
}
log.Printf("successfully verified block at path=%q, blockCount=%d", blockPath, blocksCount)
return nil
},
},
},
}

View file

@ -27,8 +27,9 @@ func InsertHandler(req *http.Request) error {
if err != nil {
return err
}
isGzip := req.Header.Get("Content-Encoding") == "gzip"
return writeconcurrencylimiter.Do(func() error {
return parser.ParseStream(req, func(block *parser.Block) error {
return parser.ParseStream(req.Body, isGzip, func(block *parser.Block) error {
return insertRows(block, extraLabels)
})
})

View file

@ -15,9 +15,11 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl.html): add ability to verify files obtained via [native export](https://docs.victoriametrics.com/#how-to-export-data-in-native-format). See [these docs](https://docs.victoriametrics.com/vmctl.html#verifying-exported-blocks-from-victoriametrics) and [this feature request](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2362).
* FEATURE: [vmui](https://docs.victoriametrics.com/#vmui): add pre-defined dasbhoards for per-job CPU usage, memory usage and disk IO usage. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2243) for details.
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): improve compatibility with [Prometheus Alert Generator specification](https://github.com/prometheus/compliance/blob/main/alert_generator/specification.md). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/2340).
* FEATURE: [vmalert](https://docs.victoriametrics.com/vmalert.html): add `-datasource.disableKeepAlive` command-line flag, which can be used for disabling [HTTP keep-alive connections](https://en.wikipedia.org/wiki/HTTP_persistent_connection) to datasources. This option can be useful for distributing load among multiple datasources behind TCP proxy such as [HAProxy](http://www.haproxy.org/).
* FEATURE: [Cluster version of VictoriaMetrics](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): reduce memory usage by up to 50% for `vminsert` and `vmstorage` under high ingestion rate.
* FEATURE: [vmgateway](https://docs.victoriametrics.com/vmgateway.html): Allow to read `-ratelimit.config` file from URL. Also add `-atelimit.configCheckInterval` command-line option. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2241).
* FEATURE: add the following command-line flags, which can be used for fine-grained limiting of CPU and memory usage during various API calls:

View file

@ -6,18 +6,17 @@ sort: 8
VictoriaMetrics command-line tool
vmctl provides various useful actions with VictoriaMetrics components.
Features:
- migrate data from [Prometheus](#migrating-data-from-prometheus) to VictoriaMetrics using snapshot API
- migrate data from [Thanos](#migrating-data-from-thanos) to VictoriaMetrics
- migrate data from [InfluxDB](#migrating-data-from-influxdb-1x) to VictoriaMetrics
- migrate data from [OpenTSDB](#migrating-data-from-opentsdb) to VictoriaMetrics
- migrate data between [VictoriaMetrics](#migrating-data-from-victoriametrics) single or cluster version.
- [verify](#verifying-exported-blocks-from-victoriametrics) exported blocks from VictoriaMetrics single or cluster version.
- [x] Prometheus: migrate data from Prometheus to VictoriaMetrics using snapshot API
- [x] Thanos: migrate data from Thanos to VictoriaMetrics
- [ ] ~~Prometheus: migrate data from Prometheus to VictoriaMetrics by query~~(discarded)
- [x] InfluxDB: migrate data from InfluxDB to VictoriaMetrics
- [x] OpenTSDB: migrate data from OpenTSDB to VictoriaMetrics
- [ ] Storage Management: data re-balancing between nodes
vmctl acts as a proxy between data source ([Prometheus](#migrating-data-from-prometheus),
[InfluxDB](#migrating-data-from-influxdb-1x), [VictoriaMetrics](##migrating-data-from-victoriametrics), etc.)
and destination - VictoriaMetrics single or cluster version. To see the full list of supported modes
To see the full list of supported modes
run the following command:
```bash
@ -33,6 +32,7 @@ COMMANDS:
influx Migrate timeseries from InfluxDB
prometheus Migrate timeseries from Prometheus
vm-native Migrate time series between VictoriaMetrics installations via native binary format
verify-block Verifies correctness of data blocks exported via VictoriaMetrics Native format. See https://docs.victoriametrics.com/#how-to-export-data-in-native-format
```
Each mode has its own unique set of flags specific (e.g. prefixed with `influx` for influx mode)
@ -512,6 +512,21 @@ Instead, use [relabeling in VictoriaMetrics](https://github.com/VictoriaMetrics/
4. When importing in or from cluster version remember to use correct [URL format](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html#url-format)
and specify `accountID` param.
## Verifying exported blocks from VictoriaMetrics
In this mode, `vmctl` allows verifying correctness and integrity of data exported via [native format](https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-export-data-in-native-format) from VictoriaMetrics.
You can verify exported data at disk before uploading it by `vmctl verify-block` command:
```bash
# export blocks from VictoriaMetrics
curl localhost:8428/api/v1/export/native -g -d 'match[]={__name__!=""}' -o exported_data_block
# verify block content
./vmctl verify-block exported_data_block
2022/03/30 18:04:50 verifying block at path="exported_data_block"
2022/03/30 18:04:50 successfully verified block at path="exported_data_block", blockCount=123786
2022/03/30 18:04:50 Total time: 100.108ms
```
## Tuning
### InfluxDB mode

View file

@ -4,7 +4,6 @@ import (
"bufio"
"fmt"
"io"
"net/http"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@ -17,12 +16,11 @@ import (
// ParseStream parses /api/v1/import/native lines from req and calls callback for parsed blocks.
//
// The callback can be called concurrently multiple times for streamed data from req.
// The callback can be called concurrently multiple times for streamed data from r.
//
// callback shouldn't hold block after returning.
func ParseStream(req *http.Request, callback func(block *Block) error) error {
r := req.Body
if req.Header.Get("Content-Encoding") == "gzip" {
func ParseStream(r io.Reader, isGzip bool, callback func(block *Block) error) error {
if isGzip {
zr, err := common.GetGzipReader(r)
if err != nil {
return fmt.Errorf("cannot read gzipped vmimport data: %w", err)

View file

@ -17,7 +17,7 @@ const (
whole = 2
)
const defaultExpireDuration = 20 * time.Minute
const defaultExpireDuration = 10 * time.Minute
// Cache is a cache for working set entries.
//
@ -149,8 +149,6 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
prev := c.prev.Load().(*fastcache.Cache)
prev.Reset()
curr := c.curr.Load().(*fastcache.Cache)
var cs fastcache.Stats
curr.UpdateStats(&cs)
c.prev.Store(curr)
// Use c.maxBytes/2 instead of cs.MaxBytesSize for creating new cache,
// since cs.MaxBytesSize may not match c.maxBytes/2, so the created cache
@ -162,7 +160,7 @@ func (c *Cache) expirationWatcher(expireDuration time.Duration) {
}
func (c *Cache) cacheSizeWatcher() {
t := time.NewTicker(time.Minute)
t := time.NewTicker(1500 * time.Millisecond)
defer t.Stop()
var maxBytesSize uint64