diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 5cc0f211a..c84351a28 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -28,6 +28,8 @@ The sandbox cluster installation is running under the constant load generated by ## tip +* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode zstd-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue comment](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992). + ## [v1.95.1](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.95.1) Released at 2023-11-16 diff --git a/lib/protoparser/promremotewrite/stream/streamparser.go b/lib/protoparser/promremotewrite/stream/streamparser.go index bbd8a1fab..485ccb850 100644 --- a/lib/protoparser/promremotewrite/stream/streamparser.go +++ b/lib/protoparser/promremotewrite/stream/streamparser.go @@ -54,7 +54,14 @@ func Parse(r io.Reader, isVMRemoteWrite bool, callback func(tss []prompb.TimeSer } else { bb.B, err = snappy.Decode(bb.B[:cap(bb.B)], ctx.reqBuf.B) if err != nil { - return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), err) + // Fall back to zstd decompression, since vmagent may send zstd-encoded messages + // without 'Content-Encoding: zstd' header if they were put into persistent queue before vmagent restart. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301#issuecomment-1815871992 + snappyErr := err + bb.B, err = zstd.Decompress(bb.B[:0], ctx.reqBuf.B) + if err != nil { + return fmt.Errorf("cannot decompress snappy-encoded request with length %d: %w", len(ctx.reqBuf.B), snappyErr) + } } } if int64(len(bb.B)) > maxInsertRequestSize.N { @@ -141,8 +148,10 @@ func putPushCtx(ctx *pushCtx) { } } -var pushCtxPool sync.Pool -var pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) +var ( + pushCtxPool sync.Pool + pushCtxPoolCh = make(chan *pushCtx, cgroup.AvailableCPUs()) +) func getWriteRequest() *prompb.WriteRequest { v := writeRequestPool.Get()