mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/protoparser/promremotewrite: fall back to zstd decoding if Snappy-decoding fails (#5344)
This case is possible after the following steps:
1. vmagent successfully performed handshake with the -remoteWrite.url and the remote storage supports zstd-compressed data.
2. remote storage became unavailable or slow to ingest data, vmagent compressed the collected data into blocks with zstd and puts these blocks to persistent queue on disk.
3. vmagent restarts and the remote storage is unavailable during the handshake, then vmagent falls back to Snappy compression.
4. vmagent starts sending zstd-compressed data from persistent queue to the remote storage, while falsely advertizing it sends Snappy-compressed data.
5. The remote storage receives zstd-compressed data and fails unpacking it with Snappy.
The solution is the same as 12cd32fd75
, just fall back to zstd decompression if Snappy decompression fails.
This commit is contained in:
parent
34a26397d7
commit
ae3107153c
2 changed files with 14 additions and 3 deletions
|
@ -28,6 +28,8 @@ The sandbox cluster installation is running under the constant load generated by
|
||||||
|
|
||||||
## tip
|
## tip
|
||||||
|
|
||||||
|
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992).
|
||||||
|
|
||||||
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)
|
## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1)
|
||||||
|
|
||||||
Released at 2023-11-16
|
Released at 2023-11-16
|
||||||
|
|
|
@ -54,7 +54,14 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer
|
||||||
} else {
|
} else {
|
||||||
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
|
bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), err)
|
// Fall back to zstd decompression, since vmagent may send zstd-encoded messages
|
||||||
|
// without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart.
|
||||||
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992
|
||||||
|
snappyErr := err
|
||||||
|
bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), snappyErr)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if int64(len(bb.B)) > maxInsertRequestSize.N {
|
if int64(len(bb.B)) > maxInsertRequestSize.N {
|
||||||
|
@ -141,8 +148,10 @@ func putPushCtx(ctx *pushCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var pushCtxPool sync.Pool
|
var (
|
||||||
var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
|
pushCtxPool sync.Pool
|
||||||
|
pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs())
|
||||||
|
)
|
||||||
|
|
||||||
func getWriteRequest() *prompb.WriteRequest {
|
func getWriteRequest() *prompb.WriteRequest {
|
||||||
v := writeRequestPool.Get()
|
v := writeRequestPool.Get()
|
||||||
|
|
Loading…
Reference in a new issue