From 50f790b5d7c9d8956e98b459c90b80de54c26be8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Mar 2021 21:03:29 +0200 Subject: [PATCH 01/18] docs/Cluster-VictoriaMetrics.md: mention that vmselect doesnt serve partial responses from export API Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1148 --- docs/Cluster-VictoriaMetrics.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/Cluster-VictoriaMetrics.md b/docs/Cluster-VictoriaMetrics.md index ffaf85bad..68b729cca 100644 --- a/docs/Cluster-VictoriaMetrics.md +++ b/docs/Cluster-VictoriaMetrics.md @@ -272,6 +272,8 @@ the update process. See [cluster availability](#cluster-availability) section fo - `vminsert` re-routes incoming data from unavailable `vmstorage` nodes to healthy `vmstorage` nodes - `vmselect` continues serving partial responses if at least a single `vmstorage` node is available. If consistency over availability is preferred, then either pass `-search.denyPartialResponse` command-line flag to `vmselect` or pass `deny_partial_response=1` query arg in requests to `vmselect`. +`vmselect` doesn't serve partial responses for API handlers returning raw datapoints - [`/api/v1/export*` endpoints](https://victoriametrics.github.io/#how-to-export-time-series), since users usually expect this data is always complete. + Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details. From aa81039b42b1ebf95250ba7e27b463e54164f7c7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 25 Mar 2021 21:30:41 +0200 Subject: [PATCH 02/18] app/vmselect: log the metric which trigger rollup result cache reset This should help finding the source of stale metrics --- app/vmselect/promql/rollup_result_cache.go | 9 +++++++++ lib/storage/storage.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/app/vmselect/promql/rollup_result_cache.go b/app/vmselect/promql/rollup_result_cache.go index 06ee3f24b..de598801b 100644 --- a/app/vmselect/promql/rollup_result_cache.go +++ b/app/vmselect/promql/rollup_result_cache.go @@ -29,12 +29,16 @@ var ( // ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`. func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) { checkRollupResultCacheResetOnce.Do(func() { + rollupResultResetMetricRowSample.Store(&storage.MetricRow{}) go checkRollupResultCacheReset() }) minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds() needCacheReset := false for i := range mrs { if mrs[i].Timestamp < minTimestamp { + var mr storage.MetricRow + mr.CopyFrom(&mrs[i]) + rollupResultResetMetricRowSample.Store(&mr) needCacheReset = true break } @@ -49,6 +53,10 @@ func checkRollupResultCacheReset() { for { time.Sleep(checkRollupResultCacheResetInterval) if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 { + mr := rollupResultResetMetricRowSample.Load().(*storage.MetricRow) + d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds() + logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs", + mr.String(), cacheTimestampOffset, float64(d)/1e3) ResetRollupResultCache() } } @@ -58,6 +66,7 @@ const checkRollupResultCacheResetInterval = 5 * time.Second var needRollupResultCacheReset uint32 var checkRollupResultCacheResetOnce sync.Once +var rollupResultResetMetricRowSample atomic.Value var rollupResultCacheV = &rollupResultCache{ c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing. diff --git a/lib/storage/storage.go b/lib/storage/storage.go index a2884dd4d..48af275c1 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -1272,7 +1272,7 @@ func (mr *MetricRow) String() string { if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil { metricName = mn.String() } - return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n", metricName, mr.Timestamp, mr.Value) + return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value) } // Marshal appends marshaled mr to dst and returns the result. From 9761ffd1611b1268bda22061a41cc04cbdf3b6ac Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 12:28:10 +0200 Subject: [PATCH 03/18] lib/promscrape/discovery/kubernetes: properly handle `too old resource version` error message from Kubernetes watch API --- docs/CHANGELOG.md | 2 + .../discovery/kubernetes/api_watcher.go | 54 ++++++++++++++----- 2 files changed, 43 insertions(+), 13 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 22727f9d2..aef0e210e 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -11,10 +11,12 @@ * FEATURE: accept and enforce `extra_label==` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage). * FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139 * FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144). +* FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery. * BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars. * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars. * BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141 +* BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150 # [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index d0cc9bb6c..68902587a 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -308,10 +308,11 @@ type urlWatcher struct { resourceVersion string - objectsCount *metrics.Counter - objectsAdded *metrics.Counter - objectsRemoved *metrics.Counter - objectsUpdated *metrics.Counter + objectsCount *metrics.Counter + objectsAdded *metrics.Counter + objectsRemoved *metrics.Counter + objectsUpdated *metrics.Counter + staleResourceVersions *metrics.Counter } func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { @@ -329,10 +330,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { awsPending: make(map[*apiWatcher]struct{}), objectsByKey: make(map[string]object), - objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), - objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), - objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), - objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), + objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), + objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), + objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), + objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), + staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)), } logger.Infof("started %s watcher for %q", uw.role, uw.apiURL) go uw.watchForUpdates() @@ -502,14 +504,15 @@ func (uw *urlWatcher) watchForUpdates() { continue } if resp.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(resp.Body) - _ = resp.Body.Close() - logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) if resp.StatusCode == 410 { // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses backoffDelay = time.Second + uw.staleResourceVersions.Inc() uw.setResourceVersion("") } else { + body, _ := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body) backoffSleep() } continue @@ -580,13 +583,25 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err) } uw.setResourceVersion(bm.Metadata.ResourceVersion) + case "ERROR": + em, err := parseError(we.Object) + if err != nil { + return fmt.Errorf("cannot parse error message for from %q: %w", we.Object, err) + } + if em.Code == 410 { + // See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses + uw.staleResourceVersions.Inc() + uw.setResourceVersion("") + return nil + } + return fmt.Errorf("unexpected error message: %q", we.Object) default: - return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) + return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object) } } } -// Bookmark is a bookmark from Kubernetes Watch API. +// Bookmark is a bookmark message from Kubernetes Watch API. // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks type Bookmark struct { Metadata struct { @@ -602,6 +617,19 @@ func parseBookmark(data []byte) (*Bookmark, error) { return &bm, nil } +// Error is an error message from Kubernetes Watch API. +type Error struct { + Code int +} + +func parseError(data []byte) (*Error, error) { + var em Error + if err := json.Unmarshal(data, &em); err != nil { + return nil, err + } + return &em, nil +} + func getAPIPaths(role string, namespaces []string, selectors []Selector) []string { objectName := getObjectNameByRole(role) if objectName == "nodes" || len(namespaces) == 0 { From f39c84b21f469042eb2584331f81865a1568cbb5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 12:45:59 +0200 Subject: [PATCH 04/18] lib/promscrape/discovery/kubernetes: typo fix in error message --- lib/promscrape/discovery/kubernetes/api_watcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go index 68902587a..2e1bfe0cc 100644 --- a/lib/promscrape/discovery/kubernetes/api_watcher.go +++ b/lib/promscrape/discovery/kubernetes/api_watcher.go @@ -586,7 +586,7 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error { case "ERROR": em, err := parseError(we.Object) if err != nil { - return fmt.Errorf("cannot parse error message for from %q: %w", we.Object, err) + return fmt.Errorf("cannot parse error message from %q: %w", we.Object, err) } if em.Code == 410 { // See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses From 1b7dc1e5a5537362234b8660da51ee9887696d9c Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 26 Mar 2021 14:17:59 +0300 Subject: [PATCH 05/18] Adds blocks drop (#1151) * adds blocks drop at 400 BadRequest status code recieved from remote storage, not expected that remote storage will be able to handle it on retry * removes error logging for dropped blocks, its expected error --- app/vmagent/remotewrite/client.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 767ff549b..4596ac261 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -259,13 +259,14 @@ again: return true } metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() - if statusCode == 409 { + if statusCode == 409 || statusCode == 400 { // Just drop block on 409 status code like Prometheus does. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873 - body, _ := ioutil.ReadAll(resp.Body) + // drop block on 400 status code, + // not expected that remote server will be able to handle it on retry + // should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 + _, _ = ioutil.ReadAll(resp.Body) _ = resp.Body.Close() - logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+ - "response body=%q", len(block), c.sanitizedURL, statusCode, body) c.packetsDropped.Inc() return true } From 8fc8ef1aba5c94b0ba61b3325b35e8cb28bb41a5 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 13:55:35 +0200 Subject: [PATCH 06/18] vendor: update github.com/klauspost/compress from v1.11.12 to v1.11.13 --- go.mod | 2 +- go.sum | 3 +- .../klauspost/compress/flate/deflate.go | 6 +- .../klauspost/compress/flate/fast_encoder.go | 26 +- .../compress/flate/huffman_bit_writer.go | 39 +- .../klauspost/compress/flate/huffman_code.go | 55 +- .../klauspost/compress/flate/level2.go | 2 +- .../klauspost/compress/fse/compress.go | 10 +- .../klauspost/compress/fse/decompress.go | 4 +- .../klauspost/compress/huff0/compress.go | 4 +- .../klauspost/compress/snappy/snappy.go | 2 +- .../klauspost/compress/zstd/blockenc.go | 42 +- .../klauspost/compress/zstd/enc_base.go | 40 +- .../klauspost/compress/zstd/enc_best.go | 1 + .../klauspost/compress/zstd/enc_better.go | 591 +++++++++++++++++- .../klauspost/compress/zstd/enc_dfast.go | 414 +++++++++++- .../klauspost/compress/zstd/enc_fast.go | 371 ++++++++++- .../klauspost/compress/zstd/encoder.go | 10 +- .../compress/zstd/encoder_options.go | 38 +- .../klauspost/compress/zstd/fse_encoder.go | 12 +- .../klauspost/compress/zstd/snappy.go | 2 +- vendor/modules.txt | 2 +- 22 files changed, 1543 insertions(+), 133 deletions(-) diff --git a/go.mod b/go.mod index 6dbebd09c..55ee81ee5 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/fatih/color v1.10.0 // indirect github.com/golang/snappy v0.0.3 github.com/influxdata/influxdb v1.8.4 - github.com/klauspost/compress v1.11.12 + github.com/klauspost/compress v1.11.13 github.com/mattn/go-runewidth v0.0.10 // indirect github.com/prometheus/client_golang v1.10.0 // indirect github.com/prometheus/common v0.20.0 // indirect diff --git a/go.sum b/go.sum index 5cb2599f2..685b72025 100644 --- a/go.sum +++ b/go.sum @@ -514,8 +514,9 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0 github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4= +github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= diff --git a/vendor/github.com/klauspost/compress/flate/deflate.go b/vendor/github.com/klauspost/compress/flate/deflate.go index 25dbe3e15..3f428d8b5 100644 --- a/vendor/github.com/klauspost/compress/flate/deflate.go +++ b/vendor/github.com/klauspost/compress/flate/deflate.go @@ -645,15 +645,15 @@ func (d *compressor) init(w io.Writer, level int) (err error) { d.fill = (*compressor).fillBlock d.step = (*compressor).store case level == ConstantCompression: - d.w.logNewTablePenalty = 4 - d.window = make([]byte, maxStoreBlockSize) + d.w.logNewTablePenalty = 8 + d.window = make([]byte, 32<<10) d.fill = (*compressor).fillBlock d.step = (*compressor).storeHuff case level == DefaultCompression: level = 5 fallthrough case level >= 1 && level <= 6: - d.w.logNewTablePenalty = 6 + d.w.logNewTablePenalty = 8 d.fast = newFastEnc(level) d.window = make([]byte, maxStoreBlockSize) d.fill = (*compressor).fillBlock diff --git a/vendor/github.com/klauspost/compress/flate/fast_encoder.go b/vendor/github.com/klauspost/compress/flate/fast_encoder.go index 4a73e1bdd..678f08105 100644 --- a/vendor/github.com/klauspost/compress/flate/fast_encoder.go +++ b/vendor/github.com/klauspost/compress/flate/fast_encoder.go @@ -6,6 +6,7 @@ package flate import ( + "encoding/binary" "fmt" "math/bits" ) @@ -65,26 +66,15 @@ func load32(b []byte, i int) uint32 { } func load64(b []byte, i int) uint64 { - // Help the compiler eliminate bounds checks on the read so it can be done in a single read. - b = b[i:] - b = b[:8] - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return binary.LittleEndian.Uint64(b[i:]) } func load3232(b []byte, i int32) uint32 { - // Help the compiler eliminate bounds checks on the read so it can be done in a single read. - b = b[i:] - b = b[:4] - return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24 + return binary.LittleEndian.Uint32(b[i:]) } func load6432(b []byte, i int32) uint64 { - // Help the compiler eliminate bounds checks on the read so it can be done in a single read. - b = b[i:] - b = b[:8] - return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 | - uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56 + return binary.LittleEndian.Uint64(b[i:]) } func hash(u uint32) uint32 { @@ -225,9 +215,9 @@ func (e *fastGen) Reset() { func matchLen(a, b []byte) int { b = b[:len(a)] var checked int - if len(a) > 4 { + if len(a) >= 4 { // Try 4 bytes first - if diff := load32(a, 0) ^ load32(b, 0); diff != 0 { + if diff := binary.LittleEndian.Uint32(a) ^ binary.LittleEndian.Uint32(b); diff != 0 { return bits.TrailingZeros32(diff) >> 3 } // Switch to 8 byte matching. @@ -236,7 +226,7 @@ func matchLen(a, b []byte) int { b = b[4:] for len(a) >= 8 { b = b[:len(a)] - if diff := load64(a, 0) ^ load64(b, 0); diff != 0 { + if diff := binary.LittleEndian.Uint64(a) ^ binary.LittleEndian.Uint64(b); diff != 0 { return checked + (bits.TrailingZeros64(diff) >> 3) } checked += 8 @@ -247,7 +237,7 @@ func matchLen(a, b []byte) int { b = b[:len(a)] for i := range a { if a[i] != b[i] { - return int(i) + checked + return i + checked } } return len(a) + checked diff --git a/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go b/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go index 208d66711..db54be139 100644 --- a/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go +++ b/vendor/github.com/klauspost/compress/flate/huffman_bit_writer.go @@ -5,6 +5,7 @@ package flate import ( + "encoding/binary" "io" ) @@ -206,7 +207,7 @@ func (w *huffmanBitWriter) write(b []byte) { } func (w *huffmanBitWriter) writeBits(b int32, nb uint16) { - w.bits |= uint64(b) << (w.nbits & reg16SizeMask64) + w.bits |= uint64(b) << w.nbits w.nbits += nb if w.nbits >= 48 { w.writeOutBits() @@ -420,13 +421,11 @@ func (w *huffmanBitWriter) writeOutBits() { w.bits >>= 48 w.nbits -= 48 n := w.nbytes - w.bytes[n] = byte(bits) - w.bytes[n+1] = byte(bits >> 8) - w.bytes[n+2] = byte(bits >> 16) - w.bytes[n+3] = byte(bits >> 24) - w.bytes[n+4] = byte(bits >> 32) - w.bytes[n+5] = byte(bits >> 40) + + // We over-write, but faster... + binary.LittleEndian.PutUint64(w.bytes[n:], bits) n += 6 + if n >= bufferFlushSize { if w.err != nil { n = 0 @@ -435,6 +434,7 @@ func (w *huffmanBitWriter) writeOutBits() { w.write(w.bytes[:n]) n = 0 } + w.nbytes = n } @@ -759,7 +759,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) } else { // inlined c := lengths[lengthCode&31] - w.bits |= uint64(c.code) << (w.nbits & reg16SizeMask64) + w.bits |= uint64(c.code) << w.nbits w.nbits += c.len if w.nbits >= 48 { w.writeOutBits() @@ -779,7 +779,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode) } else { // inlined c := offs[offsetCode&31] - w.bits |= uint64(c.code) << (w.nbits & reg16SizeMask64) + w.bits |= uint64(c.code) << w.nbits w.nbits += c.len if w.nbits >= 48 { w.writeOutBits() @@ -830,8 +830,8 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { // Assume header is around 70 bytes: // https://stackoverflow.com/a/25454430 const guessHeaderSizeBits = 70 * 8 - estBits, estExtra := histogramSize(input, w.literalFreq[:], !eof && !sync) - estBits += w.lastHeader + 15 + estBits := histogramSize(input, w.literalFreq[:], !eof && !sync) + estBits += w.lastHeader + len(input)/32 if w.lastHeader == 0 { estBits += guessHeaderSizeBits } @@ -845,9 +845,9 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { return } + reuseSize := 0 if w.lastHeader > 0 { - reuseSize := w.literalEncoding.bitLength(w.literalFreq[:256]) - estBits += estExtra + reuseSize = w.literalEncoding.bitLength(w.literalFreq[:256]) if estBits < reuseSize { // We owe an EOB @@ -859,6 +859,10 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { const numLiterals = endBlockMarker + 1 const numOffsets = 1 if w.lastHeader == 0 { + if !eof && !sync { + // Generate a slightly suboptimal tree that can be used for all. + fillHist(w.literalFreq[:numLiterals]) + } w.literalFreq[endBlockMarker] = 1 w.literalEncoding.generate(w.literalFreq[:numLiterals], 15) @@ -878,19 +882,14 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) { for _, t := range input { // Bitwriting inlined, ~30% speedup c := encoding[t] - w.bits |= uint64(c.code) << ((w.nbits) & reg16SizeMask64) + w.bits |= uint64(c.code) << w.nbits w.nbits += c.len if w.nbits >= 48 { bits := w.bits w.bits >>= 48 w.nbits -= 48 n := w.nbytes - w.bytes[n] = byte(bits) - w.bytes[n+1] = byte(bits >> 8) - w.bytes[n+2] = byte(bits >> 16) - w.bytes[n+3] = byte(bits >> 24) - w.bytes[n+4] = byte(bits >> 32) - w.bytes[n+5] = byte(bits >> 40) + binary.LittleEndian.PutUint64(w.bytes[n:], bits) n += 6 if n >= bufferFlushSize { if w.err != nil { diff --git a/vendor/github.com/klauspost/compress/flate/huffman_code.go b/vendor/github.com/klauspost/compress/flate/huffman_code.go index 4c39a3018..0d3445a1c 100644 --- a/vendor/github.com/klauspost/compress/flate/huffman_code.go +++ b/vendor/github.com/klauspost/compress/flate/huffman_code.go @@ -122,6 +122,16 @@ func (h *huffmanEncoder) bitLength(freq []uint16) int { return total } +func (h *huffmanEncoder) bitLengthRaw(b []byte) int { + var total int + for _, f := range b { + if f != 0 { + total += int(h.codes[f].len) + } + } + return total +} + // Return the number of literals assigned to each bit size in the Huffman encoding // // This method is only called when list.length >= 3 @@ -327,37 +337,40 @@ func atLeastOne(v float32) float32 { return v } +// Unassigned values are assigned '1' in the histogram. +func fillHist(b []uint16) { + for i, v := range b { + if v == 0 { + b[i] = 1 + } + } +} + // histogramSize accumulates a histogram of b in h. // An estimated size in bits is returned. -// Unassigned values are assigned '1' in the histogram. // len(h) must be >= 256, and h's elements must be all zeroes. -func histogramSize(b []byte, h []uint16, fill bool) (int, int) { +func histogramSize(b []byte, h []uint16, fill bool) (bits int) { h = h[:256] for _, t := range b { h[t]++ } - invTotal := 1.0 / float32(len(b)) - shannon := float32(0.0) - var extra float32 + total := len(b) if fill { - oneBits := atLeastOne(-mFastLog2(invTotal)) - for i, v := range h[:] { - if v > 0 { - n := float32(v) - shannon += atLeastOne(-mFastLog2(n*invTotal)) * n - } else { - h[i] = 1 - extra += oneBits - } - } - } else { - for _, v := range h[:] { - if v > 0 { - n := float32(v) - shannon += atLeastOne(-mFastLog2(n*invTotal)) * n + for _, v := range h { + if v == 0 { + total++ } } } - return int(shannon + 0.99), int(extra + 0.99) + invTotal := 1.0 / float32(total) + shannon := float32(0.0) + for _, v := range h { + if v > 0 { + n := float32(v) + shannon += atLeastOne(-mFastLog2(n*invTotal)) * n + } + } + + return int(shannon + 0.99) } diff --git a/vendor/github.com/klauspost/compress/flate/level2.go b/vendor/github.com/klauspost/compress/flate/level2.go index 5b986a194..234c4389a 100644 --- a/vendor/github.com/klauspost/compress/flate/level2.go +++ b/vendor/github.com/klauspost/compress/flate/level2.go @@ -155,7 +155,7 @@ func (e *fastEncL2) Encode(dst *tokens, src []byte) { // Store every second hash in-between, but offset by 1. for i := s - l + 2; i < s-5; i += 7 { - x := load6432(src, int32(i)) + x := load6432(src, i) nextHash := hash4u(uint32(x), bTableBits) e.table[nextHash] = tableEntry{offset: e.cur + i} // Skip one diff --git a/vendor/github.com/klauspost/compress/fse/compress.go b/vendor/github.com/klauspost/compress/fse/compress.go index b69237c9b..0d31f5ebc 100644 --- a/vendor/github.com/klauspost/compress/fse/compress.go +++ b/vendor/github.com/klauspost/compress/fse/compress.go @@ -301,7 +301,7 @@ func (s *Scratch) writeCount() error { out[outP+1] = byte(bitStream >> 8) outP += (bitCount + 7) / 8 - if uint16(charnum) > s.symbolLen { + if charnum > s.symbolLen { return errors.New("internal error: charnum > s.symbolLen") } s.Out = out[:outP] @@ -331,7 +331,7 @@ type cTable struct { func (s *Scratch) allocCtable() { tableSize := 1 << s.actualTableLog // get tableSymbol that is big enough. - if cap(s.ct.tableSymbol) < int(tableSize) { + if cap(s.ct.tableSymbol) < tableSize { s.ct.tableSymbol = make([]byte, tableSize) } s.ct.tableSymbol = s.ct.tableSymbol[:tableSize] @@ -565,8 +565,8 @@ func (s *Scratch) normalizeCount2() error { distributed uint32 total = uint32(s.br.remain()) tableLog = s.actualTableLog - lowThreshold = uint32(total >> tableLog) - lowOne = uint32((total * 3) >> (tableLog + 1)) + lowThreshold = total >> tableLog + lowOne = (total * 3) >> (tableLog + 1) ) for i, cnt := range s.count[:s.symbolLen] { if cnt == 0 { @@ -591,7 +591,7 @@ func (s *Scratch) normalizeCount2() error { if (total / toDistribute) > lowOne { // risk of rounding to zero - lowOne = uint32((total * 3) / (toDistribute * 2)) + lowOne = (total * 3) / (toDistribute * 2) for i, cnt := range s.count[:s.symbolLen] { if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) { s.norm[i] = 1 diff --git a/vendor/github.com/klauspost/compress/fse/decompress.go b/vendor/github.com/klauspost/compress/fse/decompress.go index 413ec3b3c..926f5f153 100644 --- a/vendor/github.com/klauspost/compress/fse/decompress.go +++ b/vendor/github.com/klauspost/compress/fse/decompress.go @@ -172,7 +172,7 @@ type decSymbol struct { // allocDtable will allocate decoding tables if they are not big enough. func (s *Scratch) allocDtable() { tableSize := 1 << s.actualTableLog - if cap(s.decTable) < int(tableSize) { + if cap(s.decTable) < tableSize { s.decTable = make([]decSymbol, tableSize) } s.decTable = s.decTable[:tableSize] @@ -340,7 +340,7 @@ type decoder struct { func (d *decoder) init(in *bitReader, dt []decSymbol, tableLog uint8) { d.dt = dt d.br = in - d.state = uint16(in.getBits(tableLog)) + d.state = in.getBits(tableLog) } // next returns the next symbol and sets the next state. diff --git a/vendor/github.com/klauspost/compress/huff0/compress.go b/vendor/github.com/klauspost/compress/huff0/compress.go index f9ed5f830..dea115b33 100644 --- a/vendor/github.com/klauspost/compress/huff0/compress.go +++ b/vendor/github.com/klauspost/compress/huff0/compress.go @@ -403,7 +403,7 @@ func (s *Scratch) buildCTable() error { var startNode = int16(s.symbolLen) nonNullRank := s.symbolLen - 1 - nodeNb := int16(startNode) + nodeNb := startNode huffNode := s.nodes[1 : huffNodesLen+1] // This overlays the slice above, but allows "-1" index lookups. @@ -580,7 +580,7 @@ func (s *Scratch) setMaxHeight(lastNonNull int) uint8 { // Get pos of last (smallest) symbol per rank { - currentNbBits := uint8(maxNbBits) + currentNbBits := maxNbBits for pos := int(n); pos >= 0; pos-- { if huffNode[pos].nbBits >= currentNbBits { continue diff --git a/vendor/github.com/klauspost/compress/snappy/snappy.go b/vendor/github.com/klauspost/compress/snappy/snappy.go index 74a36689e..ea58ced88 100644 --- a/vendor/github.com/klauspost/compress/snappy/snappy.go +++ b/vendor/github.com/klauspost/compress/snappy/snappy.go @@ -94,5 +94,5 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli) // https://github.com/google/snappy/blob/master/framing_format.txt func crc(b []byte) uint32 { c := crc32.Update(0, crcTable, b) - return uint32(c>>15|c<<17) + 0xa282ead8 + return c>>15 | c<<17 + 0xa282ead8 } diff --git a/vendor/github.com/klauspost/compress/zstd/blockenc.go b/vendor/github.com/klauspost/compress/zstd/blockenc.go index c85c40255..9647c64e5 100644 --- a/vendor/github.com/klauspost/compress/zstd/blockenc.go +++ b/vendor/github.com/klauspost/compress/zstd/blockenc.go @@ -22,28 +22,44 @@ type blockEnc struct { dictLitEnc *huff0.Scratch wr bitWriter - extraLits int - last bool - + extraLits int output []byte recentOffsets [3]uint32 prevRecentOffsets [3]uint32 + + last bool + lowMem bool } // init should be used once the block has been created. // If called more than once, the effect is the same as calling reset. func (b *blockEnc) init() { - if cap(b.literals) < maxCompressedLiteralSize { - b.literals = make([]byte, 0, maxCompressedLiteralSize) - } - const defSeqs = 200 - b.literals = b.literals[:0] - if cap(b.sequences) < defSeqs { - b.sequences = make([]seq, 0, defSeqs) - } - if cap(b.output) < maxCompressedBlockSize { - b.output = make([]byte, 0, maxCompressedBlockSize) + if b.lowMem { + // 1K literals + if cap(b.literals) < 1<<10 { + b.literals = make([]byte, 0, 1<<10) + } + const defSeqs = 20 + if cap(b.sequences) < defSeqs { + b.sequences = make([]seq, 0, defSeqs) + } + // 1K + if cap(b.output) < 1<<10 { + b.output = make([]byte, 0, 1<<10) + } + } else { + if cap(b.literals) < maxCompressedBlockSize { + b.literals = make([]byte, 0, maxCompressedBlockSize) + } + const defSeqs = 200 + if cap(b.sequences) < defSeqs { + b.sequences = make([]seq, 0, defSeqs) + } + if cap(b.output) < maxCompressedBlockSize { + b.output = make([]byte, 0, maxCompressedBlockSize) + } } + if b.coders.mlEnc == nil { b.coders.mlEnc = &fseEncoder{} b.coders.mlPrev = &fseEncoder{} diff --git a/vendor/github.com/klauspost/compress/zstd/enc_base.go b/vendor/github.com/klauspost/compress/zstd/enc_base.go index b1b7c6e6a..2d4d893eb 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_base.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_base.go @@ -7,6 +7,10 @@ import ( "github.com/klauspost/compress/zstd/internal/xxhash" ) +const ( + dictShardBits = 6 +) + type fastBase struct { // cur is the offset at the start of hist cur int32 @@ -17,6 +21,7 @@ type fastBase struct { tmp [8]byte blk *blockEnc lastDictID uint32 + lowMem bool } // CRC returns the underlying CRC writer. @@ -57,15 +62,10 @@ func (e *fastBase) addBlock(src []byte) int32 { // check if we have space already if len(e.hist)+len(src) > cap(e.hist) { if cap(e.hist) == 0 { - l := e.maxMatchOff * 2 - // Make it at least 1MB. - if l < 1<<20 { - l = 1 << 20 - } - e.hist = make([]byte, 0, l) + e.ensureHist(len(src)) } else { - if cap(e.hist) < int(e.maxMatchOff*2) { - panic("unexpected buffer size") + if cap(e.hist) < int(e.maxMatchOff+maxCompressedBlockSize) { + panic(fmt.Errorf("unexpected buffer cap %d, want at least %d with window %d", cap(e.hist), e.maxMatchOff+maxCompressedBlockSize, e.maxMatchOff)) } // Move down offset := int32(len(e.hist)) - e.maxMatchOff @@ -79,6 +79,28 @@ func (e *fastBase) addBlock(src []byte) int32 { return s } +// ensureHist will ensure that history can keep at least this many bytes. +func (e *fastBase) ensureHist(n int) { + if cap(e.hist) >= n { + return + } + l := e.maxMatchOff + if (e.lowMem && e.maxMatchOff > maxCompressedBlockSize) || e.maxMatchOff <= maxCompressedBlockSize { + l += maxCompressedBlockSize + } else { + l += e.maxMatchOff + } + // Make it at least 1MB. + if l < 1<<20 && !e.lowMem { + l = 1 << 20 + } + // Make it at least the requested size. + if l < int32(n) { + l = int32(n) + } + e.hist = make([]byte, 0, l) +} + // useBlock will replace the block with the provided one, // but transfer recent offsets from the previous. func (e *fastBase) UseBlock(enc *blockEnc) { @@ -117,7 +139,7 @@ func (e *fastBase) matchlen(s, t int32, src []byte) int32 { // Reset the encoding table. func (e *fastBase) resetBase(d *dict, singleBlock bool) { if e.blk == nil { - e.blk = &blockEnc{} + e.blk = &blockEnc{lowMem: e.lowMem} e.blk.init() } else { e.blk.reset(nil) diff --git a/vendor/github.com/klauspost/compress/zstd/enc_best.go b/vendor/github.com/klauspost/compress/zstd/enc_best.go index bb71d1eea..fe3625c5f 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_best.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_best.go @@ -407,6 +407,7 @@ encodeLoop: // Most notable difference is that src will not be copied for history and // we do not need to check for max match length. func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { + e.ensureHist(len(src)) e.Encode(blk, src) } diff --git a/vendor/github.com/klauspost/compress/zstd/enc_better.go b/vendor/github.com/klauspost/compress/zstd/enc_better.go index 94a5343d0..c2ce4a2ba 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_better.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_better.go @@ -16,6 +16,12 @@ const ( // This greatly depends on the type of input. betterShortTableBits = 13 // Bits used in the short match table betterShortTableSize = 1 << betterShortTableBits // Size of the table + + betterLongTableShardCnt = 1 << (betterLongTableBits - dictShardBits) // Number of shards in the table + betterLongTableShardSize = betterLongTableSize / betterLongTableShardCnt // Size of an individual shard + + betterShortTableShardCnt = 1 << (betterShortTableBits - dictShardBits) // Number of shards in the table + betterShortTableShardSize = betterShortTableSize / betterShortTableShardCnt // Size of an individual shard ) type prevEntry struct { @@ -31,10 +37,17 @@ type prevEntry struct { // and that it is longer (lazy matching). type betterFastEncoder struct { fastBase - table [betterShortTableSize]tableEntry - longTable [betterLongTableSize]prevEntry - dictTable []tableEntry - dictLongTable []prevEntry + table [betterShortTableSize]tableEntry + longTable [betterLongTableSize]prevEntry +} + +type betterFastEncoderDict struct { + betterFastEncoder + dictTable []tableEntry + dictLongTable []prevEntry + shortTableShardDirty [betterShortTableShardCnt]bool + longTableShardDirty [betterLongTableShardCnt]bool + allDirty bool } // Encode improves compression... @@ -516,11 +529,511 @@ encodeLoop: // Most notable difference is that src will not be copied for history and // we do not need to check for max match length. func (e *betterFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { + e.ensureHist(len(src)) e.Encode(blk, src) } +// Encode improves compression... +func (e *betterFastEncoderDict) Encode(blk *blockEnc, src []byte) { + const ( + // Input margin is the number of bytes we read (8) + // and the maximum we will read ahead (2) + inputMargin = 8 + 2 + minNonLiteralBlockSize = 16 + ) + + // Protect against e.cur wraparound. + for e.cur >= bufferReset { + if len(e.hist) == 0 { + for i := range e.table[:] { + e.table[i] = tableEntry{} + } + for i := range e.longTable[:] { + e.longTable[i] = prevEntry{} + } + e.cur = e.maxMatchOff + e.allDirty = true + break + } + // Shift down everything in the table that isn't already too far away. + minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff + for i := range e.table[:] { + v := e.table[i].offset + if v < minOff { + v = 0 + } else { + v = v - e.cur + e.maxMatchOff + } + e.table[i].offset = v + } + for i := range e.longTable[:] { + v := e.longTable[i].offset + v2 := e.longTable[i].prev + if v < minOff { + v = 0 + v2 = 0 + } else { + v = v - e.cur + e.maxMatchOff + if v2 < minOff { + v2 = 0 + } else { + v2 = v2 - e.cur + e.maxMatchOff + } + } + e.longTable[i] = prevEntry{ + offset: v, + prev: v2, + } + } + e.allDirty = true + e.cur = e.maxMatchOff + break + } + + s := e.addBlock(src) + blk.size = len(src) + if len(src) < minNonLiteralBlockSize { + blk.extraLits = len(src) + blk.literals = blk.literals[:len(src)] + copy(blk.literals, src) + return + } + + // Override src + src = e.hist + sLimit := int32(len(src)) - inputMargin + // stepSize is the number of bytes to skip on every main loop iteration. + // It should be >= 1. + const stepSize = 1 + + const kSearchStrength = 9 + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := s + cv := load6432(src, s) + + // Relative offsets + offset1 := int32(blk.recentOffsets[0]) + offset2 := int32(blk.recentOffsets[1]) + + addLiterals := func(s *seq, until int32) { + if until == nextEmit { + return + } + blk.literals = append(blk.literals, src[nextEmit:until]...) + s.litLen = uint32(until - nextEmit) + } + if debug { + println("recent offsets:", blk.recentOffsets) + } + +encodeLoop: + for { + var t int32 + // We allow the encoder to optionally turn off repeat offsets across blocks + canRepeat := len(blk.sequences) > 2 + var matched int32 + + for { + if debugAsserts && canRepeat && offset1 == 0 { + panic("offset0 was 0") + } + + nextHashS := hash5(cv, betterShortTableBits) + nextHashL := hash8(cv, betterLongTableBits) + candidateL := e.longTable[nextHashL] + candidateS := e.table[nextHashS] + + const repOff = 1 + repIndex := s - offset1 + repOff + off := s + e.cur + e.longTable[nextHashL] = prevEntry{offset: off, prev: candidateL.offset} + e.markLongShardDirty(nextHashL) + e.table[nextHashS] = tableEntry{offset: off, val: uint32(cv)} + e.markShortShardDirty(nextHashS) + + if canRepeat { + if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { + // Consider history as well. + var seq seq + lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + + seq.matchLen = uint32(lenght - zstdMinMatch) + + // We might be able to match backwards. + // Extend as long as we can. + start := s + repOff + // We end the search early, so we don't risk 0 literals + // and have to do special offset treatment. + startLimit := nextEmit + 1 + + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 { + repIndex-- + start-- + seq.matchLen++ + } + addLiterals(&seq, start) + + // rep 0 + seq.offset = 1 + if debugSequences { + println("repeat sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Index match start+1 (long) -> s - 1 + index0 := s + repOff + s += lenght + repOff + + nextEmit = s + if s >= sLimit { + if debug { + println("repeat ended", s, lenght) + + } + break encodeLoop + } + // Index skipped... + for index0 < s-1 { + cv0 := load6432(src, index0) + cv1 := cv0 >> 8 + h0 := hash8(cv0, betterLongTableBits) + off := index0 + e.cur + e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset} + e.markLongShardDirty(h0) + h1 := hash5(cv1, betterShortTableBits) + e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)} + e.markShortShardDirty(h1) + index0 += 2 + } + cv = load6432(src, s) + continue + } + const repOff2 = 1 + + // We deviate from the reference encoder and also check offset 2. + // Still slower and not much better, so disabled. + // repIndex = s - offset2 + repOff2 + if false && repIndex >= 0 && load6432(src, repIndex) == load6432(src, s+repOff) { + // Consider history as well. + var seq seq + lenght := 8 + e.matchlen(s+8+repOff2, repIndex+8, src) + + seq.matchLen = uint32(lenght - zstdMinMatch) + + // We might be able to match backwards. + // Extend as long as we can. + start := s + repOff2 + // We end the search early, so we don't risk 0 literals + // and have to do special offset treatment. + startLimit := nextEmit + 1 + + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 { + repIndex-- + start-- + seq.matchLen++ + } + addLiterals(&seq, start) + + // rep 2 + seq.offset = 2 + if debugSequences { + println("repeat sequence 2", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + index0 := s + repOff2 + s += lenght + repOff2 + nextEmit = s + if s >= sLimit { + if debug { + println("repeat ended", s, lenght) + + } + break encodeLoop + } + + // Index skipped... + for index0 < s-1 { + cv0 := load6432(src, index0) + cv1 := cv0 >> 8 + h0 := hash8(cv0, betterLongTableBits) + off := index0 + e.cur + e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset} + e.markLongShardDirty(h0) + h1 := hash5(cv1, betterShortTableBits) + e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)} + e.markShortShardDirty(h1) + index0 += 2 + } + cv = load6432(src, s) + // Swap offsets + offset1, offset2 = offset2, offset1 + continue + } + } + // Find the offsets of our two matches. + coffsetL := candidateL.offset - e.cur + coffsetLP := candidateL.prev - e.cur + + // Check if we have a long match. + if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) { + // Found a long match, at least 8 bytes. + matched = e.matchlen(s+8, coffsetL+8, src) + 8 + t = coffsetL + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugMatches { + println("long match") + } + + if s-coffsetLP < e.maxMatchOff && cv == load6432(src, coffsetLP) { + // Found a long match, at least 8 bytes. + prevMatch := e.matchlen(s+8, coffsetLP+8, src) + 8 + if prevMatch > matched { + matched = prevMatch + t = coffsetLP + } + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugMatches { + println("long match") + } + } + break + } + + // Check if we have a long match on prev. + if s-coffsetLP < e.maxMatchOff && cv == load6432(src, coffsetLP) { + // Found a long match, at least 8 bytes. + matched = e.matchlen(s+8, coffsetLP+8, src) + 8 + t = coffsetLP + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugMatches { + println("long match") + } + break + } + + coffsetS := candidateS.offset - e.cur + + // Check if we have a short match. + if s-coffsetS < e.maxMatchOff && uint32(cv) == candidateS.val { + // found a regular match + matched = e.matchlen(s+4, coffsetS+4, src) + 4 + + // See if we can find a long match at s+1 + const checkAt = 1 + cv := load6432(src, s+checkAt) + nextHashL = hash8(cv, betterLongTableBits) + candidateL = e.longTable[nextHashL] + coffsetL = candidateL.offset - e.cur + + // We can store it, since we have at least a 4 byte match. + e.longTable[nextHashL] = prevEntry{offset: s + checkAt + e.cur, prev: candidateL.offset} + e.markLongShardDirty(nextHashL) + if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) { + // Found a long match, at least 8 bytes. + matchedNext := e.matchlen(s+8+checkAt, coffsetL+8, src) + 8 + if matchedNext > matched { + t = coffsetL + s += checkAt + matched = matchedNext + if debugMatches { + println("long match (after short)") + } + break + } + } + + // Check prev long... + coffsetL = candidateL.prev - e.cur + if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) { + // Found a long match, at least 8 bytes. + matchedNext := e.matchlen(s+8+checkAt, coffsetL+8, src) + 8 + if matchedNext > matched { + t = coffsetL + s += checkAt + matched = matchedNext + if debugMatches { + println("prev long match (after short)") + } + break + } + } + t = coffsetS + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugAsserts && t < 0 { + panic("t<0") + } + if debugMatches { + println("short match") + } + break + } + + // No match found, move forward in input. + s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1)) + if s >= sLimit { + break encodeLoop + } + cv = load6432(src, s) + } + + // A 4-byte match has been found. Update recent offsets. + // We'll later see if more than 4 bytes. + offset2 = offset1 + offset1 = s - t + + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + + if debugAsserts && canRepeat && int(offset1) > len(src) { + panic("invalid offset") + } + + // Extend the n-byte match as long as possible. + l := matched + + // Extend backwards + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength { + s-- + t-- + l++ + } + + // Write our sequence + var seq seq + seq.litLen = uint32(s - nextEmit) + seq.matchLen = uint32(l - zstdMinMatch) + if seq.litLen > 0 { + blk.literals = append(blk.literals, src[nextEmit:s]...) + } + seq.offset = uint32(s-t) + 3 + s += l + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + nextEmit = s + if s >= sLimit { + break encodeLoop + } + + // Index match start+1 (long) -> s - 1 + index0 := s - l + 1 + for index0 < s-1 { + cv0 := load6432(src, index0) + cv1 := cv0 >> 8 + h0 := hash8(cv0, betterLongTableBits) + off := index0 + e.cur + e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset} + e.markLongShardDirty(h0) + h1 := hash5(cv1, betterShortTableBits) + e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)} + e.markShortShardDirty(h1) + index0 += 2 + } + + cv = load6432(src, s) + if !canRepeat { + continue + } + + // Check offset 2 + for { + o2 := s - offset2 + if load3232(src, o2) != uint32(cv) { + // Do regular search + break + } + + // Store this, since we have it. + nextHashS := hash5(cv, betterShortTableBits) + nextHashL := hash8(cv, betterLongTableBits) + + // We have at least 4 byte match. + // No need to check backwards. We come straight from a match + l := 4 + e.matchlen(s+4, o2+4, src) + + e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: e.longTable[nextHashL].offset} + e.markLongShardDirty(nextHashL) + e.table[nextHashS] = tableEntry{offset: s + e.cur, val: uint32(cv)} + e.markShortShardDirty(nextHashS) + seq.matchLen = uint32(l) - zstdMinMatch + seq.litLen = 0 + + // Since litlen is always 0, this is offset 1. + seq.offset = 1 + s += l + nextEmit = s + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Swap offset 1 and 2. + offset1, offset2 = offset2, offset1 + if s >= sLimit { + // Finished + break encodeLoop + } + cv = load6432(src, s) + } + } + + if int(nextEmit) < len(src) { + blk.literals = append(blk.literals, src[nextEmit:]...) + blk.extraLits = len(src) - int(nextEmit) + } + blk.recentOffsets[0] = uint32(offset1) + blk.recentOffsets[1] = uint32(offset2) + if debug { + println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits) + } +} + // ResetDict will reset and set a dictionary if not nil func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) { + e.resetBase(d, singleBlock) + if d != nil { + panic("betterFastEncoder: Reset with dict") + } +} + +// ResetDict will reset and set a dictionary if not nil +func (e *betterFastEncoderDict) Reset(d *dict, singleBlock bool) { e.resetBase(d, singleBlock) if d == nil { return @@ -557,6 +1070,7 @@ func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) { } } e.lastDictID = d.id + e.allDirty = true } // Init or copy dict table @@ -585,11 +1099,72 @@ func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) { } } e.lastDictID = d.id + e.allDirty = true } - // Reset table to initial state - copy(e.longTable[:], e.dictLongTable) - e.cur = e.maxMatchOff // Reset table to initial state - copy(e.table[:], e.dictTable) + { + dirtyShardCnt := 0 + if !e.allDirty { + for i := range e.shortTableShardDirty { + if e.shortTableShardDirty[i] { + dirtyShardCnt++ + } + } + } + const shardCnt = betterShortTableShardCnt + const shardSize = betterShortTableShardSize + if e.allDirty || dirtyShardCnt > shardCnt*4/6 { + copy(e.table[:], e.dictTable) + for i := range e.shortTableShardDirty { + e.shortTableShardDirty[i] = false + } + } else { + for i := range e.shortTableShardDirty { + if !e.shortTableShardDirty[i] { + continue + } + + copy(e.table[i*shardSize:(i+1)*shardSize], e.dictTable[i*shardSize:(i+1)*shardSize]) + e.shortTableShardDirty[i] = false + } + } + } + { + dirtyShardCnt := 0 + if !e.allDirty { + for i := range e.shortTableShardDirty { + if e.shortTableShardDirty[i] { + dirtyShardCnt++ + } + } + } + const shardCnt = betterLongTableShardCnt + const shardSize = betterLongTableShardSize + if e.allDirty || dirtyShardCnt > shardCnt*4/6 { + copy(e.longTable[:], e.dictLongTable) + for i := range e.longTableShardDirty { + e.longTableShardDirty[i] = false + } + } else { + for i := range e.longTableShardDirty { + if !e.longTableShardDirty[i] { + continue + } + + copy(e.longTable[i*shardSize:(i+1)*shardSize], e.dictLongTable[i*shardSize:(i+1)*shardSize]) + e.longTableShardDirty[i] = false + } + } + } + e.cur = e.maxMatchOff + e.allDirty = false +} + +func (e *betterFastEncoderDict) markLongShardDirty(entryNum uint32) { + e.longTableShardDirty[entryNum/betterLongTableShardSize] = true +} + +func (e *betterFastEncoderDict) markShortShardDirty(entryNum uint32) { + e.shortTableShardDirty[entryNum/betterShortTableShardSize] = true } diff --git a/vendor/github.com/klauspost/compress/zstd/enc_dfast.go b/vendor/github.com/klauspost/compress/zstd/enc_dfast.go index 19eebf66e..8629d43d8 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_dfast.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_dfast.go @@ -11,6 +11,9 @@ const ( dFastLongTableSize = 1 << dFastLongTableBits // Size of the table dFastLongTableMask = dFastLongTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. + dLongTableShardCnt = 1 << (dFastLongTableBits - dictShardBits) // Number of shards in the table + dLongTableShardSize = dFastLongTableSize / tableShardCnt // Size of an individual shard + dFastShortTableBits = tableBits // Bits used in the short match table dFastShortTableSize = 1 << dFastShortTableBits // Size of the table dFastShortTableMask = dFastShortTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. @@ -18,8 +21,14 @@ const ( type doubleFastEncoder struct { fastEncoder - longTable [dFastLongTableSize]tableEntry - dictLongTable []tableEntry + longTable [dFastLongTableSize]tableEntry +} + +type doubleFastEncoderDict struct { + fastEncoderDict + longTable [dFastLongTableSize]tableEntry + dictLongTable []tableEntry + longTableShardDirty [dLongTableShardCnt]bool } // Encode mimmics functionality in zstd_dfast.c @@ -678,9 +687,379 @@ encodeLoop: } } +// Encode will encode the content, with a dictionary if initialized for it. +func (e *doubleFastEncoderDict) Encode(blk *blockEnc, src []byte) { + const ( + // Input margin is the number of bytes we read (8) + // and the maximum we will read ahead (2) + inputMargin = 8 + 2 + minNonLiteralBlockSize = 16 + ) + + // Protect against e.cur wraparound. + for e.cur >= bufferReset { + if len(e.hist) == 0 { + for i := range e.table[:] { + e.table[i] = tableEntry{} + } + for i := range e.longTable[:] { + e.longTable[i] = tableEntry{} + } + e.markAllShardsDirty() + e.cur = e.maxMatchOff + break + } + // Shift down everything in the table that isn't already too far away. + minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff + for i := range e.table[:] { + v := e.table[i].offset + if v < minOff { + v = 0 + } else { + v = v - e.cur + e.maxMatchOff + } + e.table[i].offset = v + } + for i := range e.longTable[:] { + v := e.longTable[i].offset + if v < minOff { + v = 0 + } else { + v = v - e.cur + e.maxMatchOff + } + e.longTable[i].offset = v + } + e.markAllShardsDirty() + e.cur = e.maxMatchOff + break + } + + s := e.addBlock(src) + blk.size = len(src) + if len(src) < minNonLiteralBlockSize { + blk.extraLits = len(src) + blk.literals = blk.literals[:len(src)] + copy(blk.literals, src) + return + } + + // Override src + src = e.hist + sLimit := int32(len(src)) - inputMargin + // stepSize is the number of bytes to skip on every main loop iteration. + // It should be >= 1. + const stepSize = 1 + + const kSearchStrength = 8 + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := s + cv := load6432(src, s) + + // Relative offsets + offset1 := int32(blk.recentOffsets[0]) + offset2 := int32(blk.recentOffsets[1]) + + addLiterals := func(s *seq, until int32) { + if until == nextEmit { + return + } + blk.literals = append(blk.literals, src[nextEmit:until]...) + s.litLen = uint32(until - nextEmit) + } + if debug { + println("recent offsets:", blk.recentOffsets) + } + +encodeLoop: + for { + var t int32 + // We allow the encoder to optionally turn off repeat offsets across blocks + canRepeat := len(blk.sequences) > 2 + + for { + if debugAsserts && canRepeat && offset1 == 0 { + panic("offset0 was 0") + } + + nextHashS := hash5(cv, dFastShortTableBits) + nextHashL := hash8(cv, dFastLongTableBits) + candidateL := e.longTable[nextHashL] + candidateS := e.table[nextHashS] + + const repOff = 1 + repIndex := s - offset1 + repOff + entry := tableEntry{offset: s + e.cur, val: uint32(cv)} + e.longTable[nextHashL] = entry + e.markLongShardDirty(nextHashL) + e.table[nextHashS] = entry + e.markShardDirty(nextHashS) + + if canRepeat { + if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) { + // Consider history as well. + var seq seq + lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src) + + seq.matchLen = uint32(lenght - zstdMinMatch) + + // We might be able to match backwards. + // Extend as long as we can. + start := s + repOff + // We end the search early, so we don't risk 0 literals + // and have to do special offset treatment. + startLimit := nextEmit + 1 + + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 { + repIndex-- + start-- + seq.matchLen++ + } + addLiterals(&seq, start) + + // rep 0 + seq.offset = 1 + if debugSequences { + println("repeat sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + s += lenght + repOff + nextEmit = s + if s >= sLimit { + if debug { + println("repeat ended", s, lenght) + + } + break encodeLoop + } + cv = load6432(src, s) + continue + } + } + // Find the offsets of our two matches. + coffsetL := s - (candidateL.offset - e.cur) + coffsetS := s - (candidateS.offset - e.cur) + + // Check if we have a long match. + if coffsetL < e.maxMatchOff && uint32(cv) == candidateL.val { + // Found a long match, likely at least 8 bytes. + // Reference encoder checks all 8 bytes, we only check 4, + // but the likelihood of both the first 4 bytes and the hash matching should be enough. + t = candidateL.offset - e.cur + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugMatches { + println("long match") + } + break + } + + // Check if we have a short match. + if coffsetS < e.maxMatchOff && uint32(cv) == candidateS.val { + // found a regular match + // See if we can find a long match at s+1 + const checkAt = 1 + cv := load6432(src, s+checkAt) + nextHashL = hash8(cv, dFastLongTableBits) + candidateL = e.longTable[nextHashL] + coffsetL = s - (candidateL.offset - e.cur) + checkAt + + // We can store it, since we have at least a 4 byte match. + e.longTable[nextHashL] = tableEntry{offset: s + checkAt + e.cur, val: uint32(cv)} + e.markLongShardDirty(nextHashL) + if coffsetL < e.maxMatchOff && uint32(cv) == candidateL.val { + // Found a long match, likely at least 8 bytes. + // Reference encoder checks all 8 bytes, we only check 4, + // but the likelihood of both the first 4 bytes and the hash matching should be enough. + t = candidateL.offset - e.cur + s += checkAt + if debugMatches { + println("long match (after short)") + } + break + } + + t = candidateS.offset - e.cur + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugAsserts && t < 0 { + panic("t<0") + } + if debugMatches { + println("short match") + } + break + } + + // No match found, move forward in input. + s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1)) + if s >= sLimit { + break encodeLoop + } + cv = load6432(src, s) + } + + // A 4-byte match has been found. Update recent offsets. + // We'll later see if more than 4 bytes. + offset2 = offset1 + offset1 = s - t + + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + + if debugAsserts && canRepeat && int(offset1) > len(src) { + panic("invalid offset") + } + + // Extend the 4-byte match as long as possible. + l := e.matchlen(s+4, t+4, src) + 4 + + // Extend backwards + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength { + s-- + t-- + l++ + } + + // Write our sequence + var seq seq + seq.litLen = uint32(s - nextEmit) + seq.matchLen = uint32(l - zstdMinMatch) + if seq.litLen > 0 { + blk.literals = append(blk.literals, src[nextEmit:s]...) + } + seq.offset = uint32(s-t) + 3 + s += l + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + nextEmit = s + if s >= sLimit { + break encodeLoop + } + + // Index match start+1 (long) and start+2 (short) + index0 := s - l + 1 + // Index match end-2 (long) and end-1 (short) + index1 := s - 2 + + cv0 := load6432(src, index0) + cv1 := load6432(src, index1) + te0 := tableEntry{offset: index0 + e.cur, val: uint32(cv0)} + te1 := tableEntry{offset: index1 + e.cur, val: uint32(cv1)} + longHash1 := hash8(cv0, dFastLongTableBits) + longHash2 := hash8(cv0, dFastLongTableBits) + e.longTable[longHash1] = te0 + e.longTable[longHash2] = te1 + e.markLongShardDirty(longHash1) + e.markLongShardDirty(longHash2) + cv0 >>= 8 + cv1 >>= 8 + te0.offset++ + te1.offset++ + te0.val = uint32(cv0) + te1.val = uint32(cv1) + hashVal1 := hash5(cv0, dFastShortTableBits) + hashVal2 := hash5(cv1, dFastShortTableBits) + e.table[hashVal1] = te0 + e.markShardDirty(hashVal1) + e.table[hashVal2] = te1 + e.markShardDirty(hashVal2) + + cv = load6432(src, s) + + if !canRepeat { + continue + } + + // Check offset 2 + for { + o2 := s - offset2 + if load3232(src, o2) != uint32(cv) { + // Do regular search + break + } + + // Store this, since we have it. + nextHashS := hash5(cv, dFastShortTableBits) + nextHashL := hash8(cv, dFastLongTableBits) + + // We have at least 4 byte match. + // No need to check backwards. We come straight from a match + l := 4 + e.matchlen(s+4, o2+4, src) + + entry := tableEntry{offset: s + e.cur, val: uint32(cv)} + e.longTable[nextHashL] = entry + e.markLongShardDirty(nextHashL) + e.table[nextHashS] = entry + e.markShardDirty(nextHashS) + seq.matchLen = uint32(l) - zstdMinMatch + seq.litLen = 0 + + // Since litlen is always 0, this is offset 1. + seq.offset = 1 + s += l + nextEmit = s + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Swap offset 1 and 2. + offset1, offset2 = offset2, offset1 + if s >= sLimit { + // Finished + break encodeLoop + } + cv = load6432(src, s) + } + } + + if int(nextEmit) < len(src) { + blk.literals = append(blk.literals, src[nextEmit:]...) + blk.extraLits = len(src) - int(nextEmit) + } + blk.recentOffsets[0] = uint32(offset1) + blk.recentOffsets[1] = uint32(offset2) + if debug { + println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits) + } + // If we encoded more than 64K mark all dirty. + if len(src) > 64<<10 { + e.markAllShardsDirty() + } +} + // ResetDict will reset and set a dictionary if not nil func (e *doubleFastEncoder) Reset(d *dict, singleBlock bool) { e.fastEncoder.Reset(d, singleBlock) + if d != nil { + panic("doubleFastEncoder: Reset with dict not supported") + } +} + +// ResetDict will reset and set a dictionary if not nil +func (e *doubleFastEncoderDict) Reset(d *dict, singleBlock bool) { + allDirty := e.allDirty + e.fastEncoderDict.Reset(d, singleBlock) if d == nil { return } @@ -706,8 +1085,37 @@ func (e *doubleFastEncoder) Reset(d *dict, singleBlock bool) { } } e.lastDictID = d.id + e.allDirty = true } // Reset table to initial state e.cur = e.maxMatchOff - copy(e.longTable[:], e.dictLongTable) + + dirtyShardCnt := 0 + if !allDirty { + for i := range e.longTableShardDirty { + if e.longTableShardDirty[i] { + dirtyShardCnt++ + } + } + } + + if allDirty || dirtyShardCnt > dLongTableShardCnt/2 { + copy(e.longTable[:], e.dictLongTable) + for i := range e.longTableShardDirty { + e.longTableShardDirty[i] = false + } + return + } + for i := range e.longTableShardDirty { + if !e.longTableShardDirty[i] { + continue + } + + copy(e.longTable[i*dLongTableShardSize:(i+1)*dLongTableShardSize], e.dictLongTable[i*dLongTableShardSize:(i+1)*dLongTableShardSize]) + e.longTableShardDirty[i] = false + } +} + +func (e *doubleFastEncoderDict) markLongShardDirty(entryNum uint32) { + e.longTableShardDirty[entryNum/dLongTableShardSize] = true } diff --git a/vendor/github.com/klauspost/compress/zstd/enc_fast.go b/vendor/github.com/klauspost/compress/zstd/enc_fast.go index 0045016d9..ba4a17e10 100644 --- a/vendor/github.com/klauspost/compress/zstd/enc_fast.go +++ b/vendor/github.com/klauspost/compress/zstd/enc_fast.go @@ -11,9 +11,11 @@ import ( ) const ( - tableBits = 15 // Bits used in the table - tableSize = 1 << tableBits // Size of the table - tableMask = tableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. + tableBits = 15 // Bits used in the table + tableSize = 1 << tableBits // Size of the table + tableShardCnt = 1 << (tableBits - dictShardBits) // Number of shards in the table + tableShardSize = tableSize / tableShardCnt // Size of an individual shard + tableMask = tableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. maxMatchLength = 131074 ) @@ -24,8 +26,14 @@ type tableEntry struct { type fastEncoder struct { fastBase - table [tableSize]tableEntry - dictTable []tableEntry + table [tableSize]tableEntry +} + +type fastEncoderDict struct { + fastEncoder + dictTable []tableEntry + tableShardDirty [tableShardCnt]bool + allDirty bool } // Encode mimmics functionality in zstd_fast.c @@ -617,8 +625,322 @@ encodeLoop: } } +// Encode will encode the content, with a dictionary if initialized for it. +func (e *fastEncoderDict) Encode(blk *blockEnc, src []byte) { + const ( + inputMargin = 8 + minNonLiteralBlockSize = 1 + 1 + inputMargin + ) + if e.allDirty || len(src) > 32<<10 { + e.fastEncoder.Encode(blk, src) + e.allDirty = true + return + } + // Protect against e.cur wraparound. + for e.cur >= bufferReset { + if len(e.hist) == 0 { + for i := range e.table[:] { + e.table[i] = tableEntry{} + } + e.cur = e.maxMatchOff + break + } + // Shift down everything in the table that isn't already too far away. + minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff + for i := range e.table[:] { + v := e.table[i].offset + if v < minOff { + v = 0 + } else { + v = v - e.cur + e.maxMatchOff + } + e.table[i].offset = v + } + e.cur = e.maxMatchOff + break + } + + s := e.addBlock(src) + blk.size = len(src) + if len(src) < minNonLiteralBlockSize { + blk.extraLits = len(src) + blk.literals = blk.literals[:len(src)] + copy(blk.literals, src) + return + } + + // Override src + src = e.hist + sLimit := int32(len(src)) - inputMargin + // stepSize is the number of bytes to skip on every main loop iteration. + // It should be >= 2. + const stepSize = 2 + + // TEMPLATE + const hashLog = tableBits + // seems global, but would be nice to tweak. + const kSearchStrength = 7 + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := s + cv := load6432(src, s) + + // Relative offsets + offset1 := int32(blk.recentOffsets[0]) + offset2 := int32(blk.recentOffsets[1]) + + addLiterals := func(s *seq, until int32) { + if until == nextEmit { + return + } + blk.literals = append(blk.literals, src[nextEmit:until]...) + s.litLen = uint32(until - nextEmit) + } + if debug { + println("recent offsets:", blk.recentOffsets) + } + +encodeLoop: + for { + // t will contain the match offset when we find one. + // When existing the search loop, we have already checked 4 bytes. + var t int32 + + // We will not use repeat offsets across blocks. + // By not using them for the first 3 matches + canRepeat := len(blk.sequences) > 2 + + for { + if debugAsserts && canRepeat && offset1 == 0 { + panic("offset0 was 0") + } + + nextHash := hash6(cv, hashLog) + nextHash2 := hash6(cv>>8, hashLog) + candidate := e.table[nextHash] + candidate2 := e.table[nextHash2] + repIndex := s - offset1 + 2 + + e.table[nextHash] = tableEntry{offset: s + e.cur, val: uint32(cv)} + e.markShardDirty(nextHash) + e.table[nextHash2] = tableEntry{offset: s + e.cur + 1, val: uint32(cv >> 8)} + e.markShardDirty(nextHash2) + + if canRepeat && repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>16) { + // Consider history as well. + var seq seq + var length int32 + // length = 4 + e.matchlen(s+6, repIndex+4, src) + { + a := src[s+6:] + b := src[repIndex+4:] + endI := len(a) & (math.MaxInt32 - 7) + length = int32(endI) + 4 + for i := 0; i < endI; i += 8 { + if diff := load64(a, i) ^ load64(b, i); diff != 0 { + length = int32(i+bits.TrailingZeros64(diff)>>3) + 4 + break + } + } + } + + seq.matchLen = uint32(length - zstdMinMatch) + + // We might be able to match backwards. + // Extend as long as we can. + start := s + 2 + // We end the search early, so we don't risk 0 literals + // and have to do special offset treatment. + startLimit := nextEmit + 1 + + sMin := s - e.maxMatchOff + if sMin < 0 { + sMin = 0 + } + for repIndex > sMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch { + repIndex-- + start-- + seq.matchLen++ + } + addLiterals(&seq, start) + + // rep 0 + seq.offset = 1 + if debugSequences { + println("repeat sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + s += length + 2 + nextEmit = s + if s >= sLimit { + if debug { + println("repeat ended", s, length) + + } + break encodeLoop + } + cv = load6432(src, s) + continue + } + coffset0 := s - (candidate.offset - e.cur) + coffset1 := s - (candidate2.offset - e.cur) + 1 + if coffset0 < e.maxMatchOff && uint32(cv) == candidate.val { + // found a regular match + t = candidate.offset - e.cur + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + break + } + + if coffset1 < e.maxMatchOff && uint32(cv>>8) == candidate2.val { + // found a regular match + t = candidate2.offset - e.cur + s++ + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + if debugAsserts && s-t > e.maxMatchOff { + panic("s - t >e.maxMatchOff") + } + if debugAsserts && t < 0 { + panic("t<0") + } + break + } + s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1)) + if s >= sLimit { + break encodeLoop + } + cv = load6432(src, s) + } + // A 4-byte match has been found. We'll later see if more than 4 bytes. + offset2 = offset1 + offset1 = s - t + + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + + if debugAsserts && canRepeat && int(offset1) > len(src) { + panic("invalid offset") + } + + // Extend the 4-byte match as long as possible. + //l := e.matchlen(s+4, t+4, src) + 4 + var l int32 + { + a := src[s+4:] + b := src[t+4:] + endI := len(a) & (math.MaxInt32 - 7) + l = int32(endI) + 4 + for i := 0; i < endI; i += 8 { + if diff := load64(a, i) ^ load64(b, i); diff != 0 { + l = int32(i+bits.TrailingZeros64(diff)>>3) + 4 + break + } + } + } + + // Extend backwards + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength { + s-- + t-- + l++ + } + + // Write our sequence. + var seq seq + seq.litLen = uint32(s - nextEmit) + seq.matchLen = uint32(l - zstdMinMatch) + if seq.litLen > 0 { + blk.literals = append(blk.literals, src[nextEmit:s]...) + } + // Don't use repeat offsets + seq.offset = uint32(s-t) + 3 + s += l + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + nextEmit = s + if s >= sLimit { + break encodeLoop + } + cv = load6432(src, s) + + // Check offset 2 + if o2 := s - offset2; canRepeat && load3232(src, o2) == uint32(cv) { + // We have at least 4 byte match. + // No need to check backwards. We come straight from a match + //l := 4 + e.matchlen(s+4, o2+4, src) + var l int32 + { + a := src[s+4:] + b := src[o2+4:] + endI := len(a) & (math.MaxInt32 - 7) + l = int32(endI) + 4 + for i := 0; i < endI; i += 8 { + if diff := load64(a, i) ^ load64(b, i); diff != 0 { + l = int32(i+bits.TrailingZeros64(diff)>>3) + 4 + break + } + } + } + + // Store this, since we have it. + nextHash := hash6(cv, hashLog) + e.table[nextHash] = tableEntry{offset: s + e.cur, val: uint32(cv)} + e.markShardDirty(nextHash) + seq.matchLen = uint32(l) - zstdMinMatch + seq.litLen = 0 + // Since litlen is always 0, this is offset 1. + seq.offset = 1 + s += l + nextEmit = s + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Swap offset 1 and 2. + offset1, offset2 = offset2, offset1 + if s >= sLimit { + break encodeLoop + } + // Prepare next loop. + cv = load6432(src, s) + } + } + + if int(nextEmit) < len(src) { + blk.literals = append(blk.literals, src[nextEmit:]...) + blk.extraLits = len(src) - int(nextEmit) + } + blk.recentOffsets[0] = uint32(offset1) + blk.recentOffsets[1] = uint32(offset2) + if debug { + println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits) + } +} + // ResetDict will reset and set a dictionary if not nil func (e *fastEncoder) Reset(d *dict, singleBlock bool) { + e.resetBase(d, singleBlock) + if d != nil { + panic("fastEncoder: Reset with dict") + } +} + +// ResetDict will reset and set a dictionary if not nil +func (e *fastEncoderDict) Reset(d *dict, singleBlock bool) { e.resetBase(d, singleBlock) if d == nil { return @@ -653,9 +975,44 @@ func (e *fastEncoder) Reset(d *dict, singleBlock bool) { } } e.lastDictID = d.id + e.allDirty = true } e.cur = e.maxMatchOff - // Reset table to initial state - copy(e.table[:], e.dictTable) + dirtyShardCnt := 0 + if !e.allDirty { + for i := range e.tableShardDirty { + if e.tableShardDirty[i] { + dirtyShardCnt++ + } + } + } + + const shardCnt = tableShardCnt + const shardSize = tableShardSize + if e.allDirty || dirtyShardCnt > shardCnt*4/6 { + copy(e.table[:], e.dictTable) + for i := range e.tableShardDirty { + e.tableShardDirty[i] = false + } + e.allDirty = false + return + } + for i := range e.tableShardDirty { + if !e.tableShardDirty[i] { + continue + } + + copy(e.table[i*shardSize:(i+1)*shardSize], e.dictTable[i*shardSize:(i+1)*shardSize]) + e.tableShardDirty[i] = false + } + e.allDirty = false +} + +func (e *fastEncoderDict) markAllShardsDirty() { + e.allDirty = true +} + +func (e *fastEncoderDict) markShardDirty(entryNum uint32) { + e.tableShardDirty[entryNum/tableShardSize] = true } diff --git a/vendor/github.com/klauspost/compress/zstd/encoder.go b/vendor/github.com/klauspost/compress/zstd/encoder.go index f5759211d..6f0265099 100644 --- a/vendor/github.com/klauspost/compress/zstd/encoder.go +++ b/vendor/github.com/klauspost/compress/zstd/encoder.go @@ -106,7 +106,7 @@ func (e *Encoder) Reset(w io.Writer) { s.encoder = e.o.encoder() } if s.writing == nil { - s.writing = &blockEnc{} + s.writing = &blockEnc{lowMem: e.o.lowMem} s.writing.init() } s.writing.initNewEncode() @@ -176,6 +176,12 @@ func (e *Encoder) nextBlock(final bool) error { } if !s.headerWritten { // If we have a single block encode, do a sync compression. + if final && len(s.filling) == 0 && !e.o.fullZero { + s.headerWritten = true + s.fullFrameWritten = true + s.eofWritten = true + return nil + } if final && len(s.filling) > 0 { s.current = e.EncodeAll(s.filling, s.current[:0]) var n2 int @@ -471,7 +477,7 @@ func (e *Encoder) EncodeAll(src, dst []byte) []byte { } // If less than 1MB, allocate a buffer up front. - if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 { + if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem { dst = make([]byte, 0, len(src)) } dst, err := fh.appendTo(dst) diff --git a/vendor/github.com/klauspost/compress/zstd/encoder_options.go b/vendor/github.com/klauspost/compress/zstd/encoder_options.go index a7312f42a..18a47eb03 100644 --- a/vendor/github.com/klauspost/compress/zstd/encoder_options.go +++ b/vendor/github.com/klauspost/compress/zstd/encoder_options.go @@ -24,12 +24,12 @@ type encoderOptions struct { allLitEntropy bool customWindow bool customALEntropy bool + lowMem bool dict *dict } func (o *encoderOptions) setDefault() { *o = encoderOptions{ - // use less ram: true for now, but may change. concurrent: runtime.GOMAXPROCS(0), crc: true, single: nil, @@ -37,20 +37,31 @@ func (o *encoderOptions) setDefault() { windowSize: 8 << 20, level: SpeedDefault, allLitEntropy: true, + lowMem: false, } } // encoder returns an encoder with the selected options. func (o encoderOptions) encoder() encoder { switch o.level { - case SpeedDefault: - return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}} - case SpeedBetterCompression: - return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} - case SpeedBestCompression: - return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} case SpeedFastest: - return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} + if o.dict != nil { + return &fastEncoderDict{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}} + } + return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} + + case SpeedDefault: + if o.dict != nil { + return &doubleFastEncoderDict{fastEncoderDict: fastEncoderDict{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}}} + } + return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}} + case SpeedBetterCompression: + if o.dict != nil { + return &betterFastEncoderDict{betterFastEncoder: betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}} + } + return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} + case SpeedBestCompression: + return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}} } panic("unknown compression level") } @@ -276,6 +287,17 @@ func WithSingleSegment(b bool) EOption { } } +// WithLowerEncoderMem will trade in some memory cases trade less memory usage for +// slower encoding speed. +// This will not change the window size which is the primary function for reducing +// memory usage. See WithWindowSize. +func WithLowerEncoderMem(b bool) EOption { + return func(o *encoderOptions) error { + o.lowMem = b + return nil + } +} + // WithEncoderDict allows to register a dictionary that will be used for the encode. // The encoder *may* choose to use no dictionary instead for certain payloads. func WithEncoderDict(dict []byte) EOption { diff --git a/vendor/github.com/klauspost/compress/zstd/fse_encoder.go b/vendor/github.com/klauspost/compress/zstd/fse_encoder.go index aa9eba88b..b80709d5e 100644 --- a/vendor/github.com/klauspost/compress/zstd/fse_encoder.go +++ b/vendor/github.com/klauspost/compress/zstd/fse_encoder.go @@ -97,7 +97,7 @@ func (s *fseEncoder) prepare() (*fseEncoder, error) { func (s *fseEncoder) allocCtable() { tableSize := 1 << s.actualTableLog // get tableSymbol that is big enough. - if cap(s.ct.tableSymbol) < int(tableSize) { + if cap(s.ct.tableSymbol) < tableSize { s.ct.tableSymbol = make([]byte, tableSize) } s.ct.tableSymbol = s.ct.tableSymbol[:tableSize] @@ -202,13 +202,13 @@ func (s *fseEncoder) buildCTable() error { case 0: case -1, 1: symbolTT[i].deltaNbBits = tl - symbolTT[i].deltaFindState = int16(total - 1) + symbolTT[i].deltaFindState = total - 1 total++ default: maxBitsOut := uint32(tableLog) - highBit(uint32(v-1)) minStatePlus := uint32(v) << maxBitsOut symbolTT[i].deltaNbBits = (maxBitsOut << 16) - minStatePlus - symbolTT[i].deltaFindState = int16(total - v) + symbolTT[i].deltaFindState = total - v total += v } } @@ -353,8 +353,8 @@ func (s *fseEncoder) normalizeCount2(length int) error { distributed uint32 total = uint32(length) tableLog = s.actualTableLog - lowThreshold = uint32(total >> tableLog) - lowOne = uint32((total * 3) >> (tableLog + 1)) + lowThreshold = total >> tableLog + lowOne = (total * 3) >> (tableLog + 1) ) for i, cnt := range s.count[:s.symbolLen] { if cnt == 0 { @@ -379,7 +379,7 @@ func (s *fseEncoder) normalizeCount2(length int) error { if (total / toDistribute) > lowOne { // risk of rounding to zero - lowOne = uint32((total * 3) / (toDistribute * 2)) + lowOne = (total * 3) / (toDistribute * 2) for i, cnt := range s.count[:s.symbolLen] { if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) { s.norm[i] = 1 diff --git a/vendor/github.com/klauspost/compress/zstd/snappy.go b/vendor/github.com/klauspost/compress/zstd/snappy.go index 841fd95ac..c95fe5111 100644 --- a/vendor/github.com/klauspost/compress/zstd/snappy.go +++ b/vendor/github.com/klauspost/compress/zstd/snappy.go @@ -417,7 +417,7 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli) // https://github.com/google/snappy/blob/master/framing_format.txt func snappyCRC(b []byte) uint32 { c := crc32.Update(0, crcTable, b) - return uint32(c>>15|c<<17) + 0xa282ead8 + return c>>15 | c<<17 + 0xa282ead8 } // snappyDecodedLen returns the length of the decoded block and the number of bytes diff --git a/vendor/modules.txt b/vendor/modules.txt index 003ba8706..7741e99a2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -126,7 +126,7 @@ github.com/jmespath/go-jmespath github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/parser -# github.com/klauspost/compress v1.11.12 +# github.com/klauspost/compress v1.11.13 ## explicit github.com/klauspost/compress/flate github.com/klauspost/compress/fse From c54cb3e63cce1380ac43bf86a50be2cb61ba1dbb Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 13:59:46 +0200 Subject: [PATCH 07/18] app/vmagent/remotewrite: remove superflouos code after 1b7dc1e5a5537362234b8660da51ee9887696d9c --- app/vmagent/remotewrite/client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/app/vmagent/remotewrite/client.go b/app/vmagent/remotewrite/client.go index 4596ac261..f56ba3a8a 100644 --- a/app/vmagent/remotewrite/client.go +++ b/app/vmagent/remotewrite/client.go @@ -265,7 +265,6 @@ again: // drop block on 400 status code, // not expected that remote server will be able to handle it on retry // should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 - _, _ = ioutil.ReadAll(resp.Body) _ = resp.Body.Close() c.packetsDropped.Inc() return true From 78188decf98cd94133f784396da8bde8f2d25b53 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 14:42:36 +0200 Subject: [PATCH 08/18] docs: document that vmagent drops data blocks when remote storage replies with 400 and 409 http status codes This is a follow up for 1b7dc1e5a5537362234b8660da51ee9887696d9c. Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 --- app/vmagent/README.md | 4 ++++ docs/CHANGELOG.md | 1 + docs/vmagent.md | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/app/vmagent/README.md b/app/vmagent/README.md index 3b00d5153..c6ec547d9 100644 --- a/app/vmagent/README.md +++ b/app/vmagent/README.md @@ -358,6 +358,10 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`. +* `vmagent` drops data blocks if remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses. The number of dropped blocks can be monitored via `vmagent_remotewrite_packets_dropped_total` metric exported at [/metrics page](#monitoring). + +* Use `-remoteWrite.queues=1` when `-remoteWrite.url` points to remote storage, which doesn't accept out-of-order samples (aka data backfilling). Such storage systems include Prometheus, Cortex and Thanos. + * `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`. The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set. If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index aef0e210e..422f14000 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -17,6 +17,7 @@ * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars. * BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141 * BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150 +* BUGFIX: vmagent: do not retry sending data blocks if remote storage returns `400 Bad Request` error. The number of dropped blocks due to such errors can be monitored with `vmagent_remotewrite_packets_dropped_total` metrics. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 # [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0) diff --git a/docs/vmagent.md b/docs/vmagent.md index 3b00d5153..c6ec547d9 100644 --- a/docs/vmagent.md +++ b/docs/vmagent.md @@ -358,6 +358,10 @@ It may be useful to perform `vmagent` rolling update without any scrape loss. Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`. +* `vmagent` drops data blocks if remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses. The number of dropped blocks can be monitored via `vmagent_remotewrite_packets_dropped_total` metric exported at [/metrics page](#monitoring). + +* Use `-remoteWrite.queues=1` when `-remoteWrite.url` points to remote storage, which doesn't accept out-of-order samples (aka data backfilling). Such storage systems include Prometheus, Cortex and Thanos. + * `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`. The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set. If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory. From 0c403bfd29d48c7dbfc78122ce823ed314c64a36 Mon Sep 17 00:00:00 2001 From: hagen1778 Date: Sun, 28 Mar 2021 20:20:22 +0100 Subject: [PATCH 09/18] deployment: fix typo in vmalert docker-compose definition --- deployment/docker/docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/deployment/docker/docker-compose.yml b/deployment/docker/docker-compose.yml index 31869646c..27208bff1 100644 --- a/deployment/docker/docker-compose.yml +++ b/deployment/docker/docker-compose.yml @@ -71,6 +71,7 @@ services: # display source of alerts in grafana - '-external.url=http://127.0.0.1:3000' #grafana outside container - '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr":"{{$$expr|quotesEscape|crlfEscape|queryEscape}}"},{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]' ## when copypaste the line be aware of '$$' for escaping in '$expr' networks: + networks: - vm_net restart: always alertmanager: From 2601cc0fb003823eeba978ad6ae458e65219da91 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 26 Mar 2021 17:57:51 +0200 Subject: [PATCH 10/18] lib/storage: do not update b.nextIdx if no samples are removed because of retention --- lib/storage/merge.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/lib/storage/merge.go b/lib/storage/merge.go index 2795b2439..e5e7f221d 100644 --- a/lib/storage/merge.go +++ b/lib/storage/merge.go @@ -179,11 +179,14 @@ func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) { timestamps := b.timestamps nextIdx := b.nextIdx + nextIdxOrig := nextIdx for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { nextIdx++ } - atomic.AddUint64(rowsDeleted, uint64(nextIdx-b.nextIdx)) - b.nextIdx = nextIdx + if n := nextIdx - nextIdxOrig; n > 0 { + atomic.AddUint64(rowsDeleted, uint64(n)) + b.nextIdx = nextIdx + } } func appendRows(ob, ib *Block) { From b75c2ce659e6a485fd274bce885577105a71228f Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 12:32:27 +0300 Subject: [PATCH 11/18] lib/uint64set: improve Set.Has() performance scalability on multi-CPU system Do not update bucket32.hint on Set.Has() call, since it leads to memory ping-pong between CPU cores multi-CPU system --- lib/uint64set/uint64set.go | 26 ++------------------------ 1 file changed, 2 insertions(+), 24 deletions(-) diff --git a/lib/uint64set/uint64set.go b/lib/uint64set/uint64set.go index a52d017e4..4262a948d 100644 --- a/lib/uint64set/uint64set.go +++ b/lib/uint64set/uint64set.go @@ -186,19 +186,6 @@ func (s *Set) Has(x uint64) bool { hi32 := uint32(x >> 32) lo32 := uint32(x) bs := s.buckets - if len(bs) > 0 && bs[0].hi == hi32 { - // Manually inline bucket32.has for performance reasons. - hi16 := uint16(lo32 >> 16) - lo16 := uint16(lo32) - b32 := &bs[0] - his := b32.b16his - if n := b32.getHint(); n < uint32(len(his)) && his[n] == hi16 { - // Fast path - check the previously used bucket. - bs := b32.buckets - return n < uint32(len(bs)) && bs[n].has(lo16) - } - return b32.hasSlow(hi16, lo16) - } for i := range bs { b32 := &bs[i] if b32.hi == hi32 { @@ -671,22 +658,13 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 { func (b *bucket32) has(x uint32) bool { hi := uint16(x >> 16) lo := uint16(x) - his := b.b16his - if n := b.getHint(); n < uint32(len(his)) && his[n] == hi { - // Fast path - check the previously used bucket. - bs := b.buckets - return n < uint32(len(bs)) && bs[n].has(lo) - } - return b.hasSlow(hi, lo) -} - -func (b *bucket32) hasSlow(hi, lo uint16) bool { his := b.b16his n := binarySearch16(his, hi) if n < 0 || n >= len(his) || his[n] != hi { return false } - b.setHint(n) + // Do not call b.setHint(n) here, since this may trash performance + // when many concurrent goroutines call b.has() method from many CPU cores. bs := b.buckets return n < len(bs) && bs[n].has(lo) } From b1e49bab52838c44955d340426823fae6fdde574 Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 29 Mar 2021 10:37:17 +0100 Subject: [PATCH 12/18] Dashboards update (#1153) * dashboard: update single node dashboard * add number of new series created over last 24h; * bump version requirements. * dashboard: update vmagent dashboard * add panel for open file descriptors; * add panel for disk I/O; * add panel for `vmagent_remotewrite_packets_dropped_total` metric; * bump version requirements. --- dashboards/victoriametrics.json | 28 +- dashboards/vmagent.json | 518 ++++++++++++++++++++++++++------ 2 files changed, 439 insertions(+), 107 deletions(-) diff --git a/dashboards/victoriametrics.json b/dashboards/victoriametrics.json index 6e6aa5f02..7a7c4d79d 100644 --- a/dashboards/victoriametrics.json +++ b/dashboards/victoriametrics.json @@ -45,12 +45,12 @@ } ] }, - "description": "Overview for single node VictoriaMetrics v1.55.1 or higher", + "description": "Overview for single node VictoriaMetrics v1.56.0 or higher", "editable": true, "gnetId": 10229, "graphTooltip": 0, "id": null, - "iteration": 1615713966732, + "iteration": 1616956884194, "links": [ { "icon": "doc", @@ -2654,7 +2654,7 @@ "dashLength": 10, "dashes": false, "datasource": "$ds", - "description": "Shows how many of new time-series are created every second. High churn rate tightly connected with database performance and may result in unexpected OOM's or slow queries. It is recommended to always keep an eye on this metric to avoid unexpected cardinality \"explosions\".\n\nGood references to read:\n* https://www.robustperception.io/cardinality-is-key\n* https://www.robustperception.io/using-tsdb-analyze-to-investigate-churn-and-cardinality", + "description": "Shows the rate and total number of new series created over last 24h.\n\nHigh churn rate tightly connected with database performance and may result in unexpected OOM's or slow queries. It is recommended to always keep an eye on this metric to avoid unexpected cardinality \"explosions\".\n\nThe higher chur rate is, the more resources required to handle it. Consider to keep the churn rate as low as possible.\n\nGood references to read:\n* https://www.robustperception.io/cardinality-is-key\n* https://www.robustperception.io/using-tsdb-analyze-to-investigate-churn-and-cardinality", "fieldConfig": { "defaults": { "custom": {}, @@ -2668,7 +2668,7 @@ "h": 8, "w": 12, "x": 0, - "y": 85 + "y": 32 }, "hiddenSeries": false, "id": 66, @@ -2689,15 +2689,27 @@ "pointradius": 2, "points": false, "renderer": "flot", - "seriesOverrides": [], + "seriesOverrides": [ + { + "alias": "new series over 24h", + "yaxis": 2 + } + ], "spaceLength": 10, "stack": false, "steppedLine": false, "targets": [ { "expr": "sum(rate(vm_new_timeseries_created_total{job=\"$job\", instance=\"$instance\"}[5m]))", + "interval": "", "legendFormat": "churn rate", "refId": "A" + }, + { + "expr": "sum(increase(vm_new_timeseries_created_total{job=\"$job\", instance=\"$instance\"}[24h]))", + "interval": "", + "legendFormat": "new series over 24h", + "refId": "B" } ], "thresholds": [], @@ -2761,7 +2773,7 @@ "h": 8, "w": 12, "x": 12, - "y": 85 + "y": 32 }, "hiddenSeries": false, "id": 60, @@ -2859,7 +2871,7 @@ "h": 9, "w": 12, "x": 0, - "y": 93 + "y": 40 }, "hiddenSeries": false, "id": 68, @@ -2958,7 +2970,7 @@ "h": 9, "w": 12, "x": 12, - "y": 93 + "y": 40 }, "hiddenSeries": false, "id": 74, diff --git a/dashboards/vmagent.json b/dashboards/vmagent.json index af7520649..c9a94b9e9 100644 --- a/dashboards/vmagent.json +++ b/dashboards/vmagent.json @@ -51,12 +51,12 @@ } ] }, - "description": "Overview for VictoriaMetrics vmagent v1.40.0 or higher", + "description": "Overview for VictoriaMetrics vmagent v1.56.0 or higher", "editable": true, "gnetId": null, "graphTooltip": 1, "id": null, - "iteration": 1598997251171, + "iteration": 1616957263139, "links": [ { "icon": "doc", @@ -1283,6 +1283,101 @@ "alignLevel": null } }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "description": "Shows the rate of dropped data blocks in cases when remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses.\n\nSee https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 32 + }, + "hiddenSeries": false, + "id": 79, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": true, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(vmagent_remotewrite_packets_dropped_total{job=~\"$job\", instance=~\"$instance\"}[$__interval]))", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Data blocks dropped ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, { "collapsed": true, "datasource": "$ds", @@ -3105,6 +3200,7 @@ "dashLength": 10, "dashes": false, "datasource": "$ds", + "description": "Panel shows the number of open file descriptors in the OS.\nReaching the limit of open files can cause various issues and must be prevented.\n\nSee how to change limits here https://medium.com/@muhammadtriwibowo/set-permanently-ulimit-n-open-files-in-ubuntu-4d61064429a", "fieldConfig": { "defaults": { "custom": {}, @@ -3121,6 +3217,326 @@ "y": 13 }, "hiddenSeries": false, + "id": 83, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "max", + "color": "#C4162A" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(process_open_fds{job=~\"$job\", instance=~\"$instance\"})", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "open", + "refId": "A" + }, + { + "expr": "min(process_max_fds{job=~\"$job\", instance=~\"$instance\"})", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "max", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Open FDs ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 2, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 13 + }, + "hiddenSeries": false, + "id": 39, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(go_goroutines{job=~\"$job\", instance=~\"$instance\"}) by(instance)", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "{{instance}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Goroutines ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": 0, + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "description": "Shows the number of bytes read/write from the storage layer when vmagent has to buffer data on disk or read already buffered data.", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 21 + }, + "hiddenSeries": false, + "id": 81, + "legend": { + "avg": false, + "current": false, + "max": false, + "min": false, + "show": false, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pluginVersion": "7.1.1", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "read", + "transform": "negative-Y" + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "sum(rate(process_io_storage_read_bytes_total{job=~\"$job\", instance=~\"$instance\"}[5m]))", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "read", + "refId": "A" + }, + { + "expr": "sum(rate(process_io_storage_written_bytes_total{job=~\"$job\", instance=~\"$instance\"}[5m]))", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "write", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Disk writes/reads ($instance)", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "decimals": null, + "format": "bytes", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "$ds", + "fieldConfig": { + "defaults": { + "custom": {}, + "links": [] + }, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 21 + }, + "hiddenSeries": false, "id": 41, "legend": { "avg": false, @@ -3195,102 +3611,6 @@ "alignLevel": null } }, - { - "aliasColors": {}, - "bars": false, - "dashLength": 10, - "dashes": false, - "datasource": "$ds", - "fieldConfig": { - "defaults": { - "custom": {}, - "links": [] - }, - "overrides": [] - }, - "fill": 1, - "fillGradient": 0, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 13 - }, - "hiddenSeries": false, - "id": 39, - "legend": { - "avg": false, - "current": false, - "max": false, - "min": false, - "show": false, - "total": false, - "values": false - }, - "lines": true, - "linewidth": 1, - "links": [], - "nullPointMode": "null", - "percentage": false, - "pluginVersion": "7.1.1", - "pointradius": 2, - "points": false, - "renderer": "flot", - "seriesOverrides": [], - "spaceLength": 10, - "stack": false, - "steppedLine": false, - "targets": [ - { - "expr": "sum(go_goroutines{job=~\"$job\", instance=~\"$instance\"}) by(instance)", - "format": "time_series", - "intervalFactor": 2, - "legendFormat": "{{instance}}", - "refId": "A" - } - ], - "thresholds": [], - "timeFrom": null, - "timeRegions": [], - "timeShift": null, - "title": "Goroutines ($instance)", - "tooltip": { - "shared": true, - "sort": 0, - "value_type": "individual" - }, - "type": "graph", - "xaxis": { - "buckets": null, - "mode": "time", - "name": null, - "show": true, - "values": [] - }, - "yaxes": [ - { - "decimals": 0, - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": "0", - "show": true - }, - { - "format": "short", - "label": null, - "logBase": 1, - "max": null, - "min": null, - "show": true - } - ], - "yaxis": { - "align": false, - "alignLevel": null - } - }, { "aliasColors": {}, "bars": false, @@ -3310,7 +3630,7 @@ "h": 8, "w": 12, "x": 0, - "y": 21 + "y": 29 }, "hiddenSeries": false, "id": 43, From cfdb6762e6ac051da8a812ff1ada6e6f8dad0dfc Mon Sep 17 00:00:00 2001 From: Roman Khavronenko Date: Mon, 29 Mar 2021 10:38:03 +0100 Subject: [PATCH 13/18] deployment: add new alert `TooHighChurnRate24h` (#1154) Alert `TooHighChurnRate24h` suppose to cover cases when churn rate is low but results in multiple times higher number than total number of active series. --- deployment/docker/alerts.yml | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/deployment/docker/alerts.yml b/deployment/docker/alerts.yml index a0a32cdbd..3f2c95d04 100644 --- a/deployment/docker/alerts.yml +++ b/deployment/docker/alerts.yml @@ -106,6 +106,23 @@ groups: High Churn Rate tightly connected with database performance and may result in unexpected OOM's or slow queries." + - alert: TooHighChurnRate24h + expr: | + sum(increase(vm_new_timeseries_created_total[24h])) by(instance) + > + (sum(vm_cache_entries{type="storage/hour_metric_ids"}) by(instance) * 3) + for: 15m + labels: + severity: warning + annotations: + dashboard: "http://localhost:3000/d/wNf0q_kZk?viewPanel=66&var-instance={{ $labels.instance }}" + summary: "Too high number of new series on \"{{ $labels.instance }}\" created over last 24h" + description: "The number of created new time series over last 24h is 3x times higher than + current number of active series on \"{{ $labels.instance }}\".\n + This effect is known as Churn Rate.\n + High Churn Rate tightly connected with database performance and may + result in unexpected OOM's or slow queries." + - alert: TooHighSlowInsertsRate expr: | ( From 602a3d99aee55d411034a49ef602cd6061726429 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 13:55:01 +0300 Subject: [PATCH 14/18] docs/CHANGELOG.md: mention optimized query performance on systems with many CPU cores --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 422f14000..0d0a07b97 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: optimize query performance by up to 10x on systems with many CPU cores. See [this tweet](https://twitter.com/MetricsVictoria/status/1375064484860067840). * FEATURE: add the following metrics at `/metrics` page for every VictoraMetrics app: * `process_resident_memory_anon_bytes` - RSS share for memory allocated by the process itself. This share cannot be freed by the OS, so it must be taken into account by OOM killer. * `process_resident_memory_file_bytes` - RSS share for page cache memory (aka memory-mapped files). This share can be freed by the OS at any time, so it must be ignored by OOM killer. From 5f4a9f782f2a8b575014bf36e7da39a020e8f95a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 14:27:42 +0300 Subject: [PATCH 15/18] docs/CHANGELOG.md: mention Graphite Render API fixes --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 0d0a07b97..2ad92b4ab 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -19,6 +19,7 @@ * BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141 * BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150 * BUGFIX: vmagent: do not retry sending data blocks if remote storage returns `400 Bad Request` error. The number of dropped blocks due to such errors can be monitored with `vmagent_remotewrite_packets_dropped_total` metrics. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149 +* BUGFIX: properly calculate `summarize` and `*Series` functions in [Graphite Render API](https://victoriametrics.github.io/#graphite-render-api-usage). # [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0) From 5c7c0b2bda71b4dca22dd6240b010e907151ce75 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 15:42:26 +0300 Subject: [PATCH 16/18] docs/CHANGELOG.md: mention about logging of metrics with too old timestamps in a single-node VictoriaMetrics This is a follow up for aa81039b42b1ebf95250ba7e27b463e54164f7c7 --- docs/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 2ad92b4ab..d701afd31 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -13,6 +13,7 @@ * FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139 * FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144). * FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery. +* FEATURE: single-node VictoriaMetrics: log metrics with timestamps older than `-search.cacheTimestampOffset` compared to the current time. See [thse docs](https://victoriametrics.github.io/#backfilling) for details. * BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars. * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars. From e6fd1f78753a5d6ce94f765652f762ad0391fcd7 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 15:46:35 +0300 Subject: [PATCH 17/18] docs/CHANGELOG.md: typo fixes --- docs/CHANGELOG.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index d701afd31..ce5d96bb7 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -10,10 +10,10 @@ * `process_resident_memory_peak_bytes` - peak RSS usage for the process. * `process_virtual_memory_peak_bytes` - peak virtual memory usage for the process. * FEATURE: accept and enforce `extra_label==` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage). -* FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139 +* FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139). * FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144). * FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery. -* FEATURE: single-node VictoriaMetrics: log metrics with timestamps older than `-search.cacheTimestampOffset` compared to the current time. See [thse docs](https://victoriametrics.github.io/#backfilling) for details. +* FEATURE: single-node VictoriaMetrics: log metrics with timestamps older than `-search.cacheTimestampOffset` compared to the current time. See [these docs](https://victoriametrics.github.io/#backfilling) for details. * BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars. * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars. From 947b37ba8edd855d68ff0205de501bc192610dd8 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Mon, 29 Mar 2021 19:14:49 +0300 Subject: [PATCH 18/18] docs/CHANGELOG.md: cut v1.57.0 --- docs/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index ce5d96bb7..2b71d7293 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,9 @@ # tip + +# [v1.57.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.57.0) + * FEATURE: optimize query performance by up to 10x on systems with many CPU cores. See [this tweet](https://twitter.com/MetricsVictoria/status/1375064484860067840). * FEATURE: add the following metrics at `/metrics` page for every VictoraMetrics app: * `process_resident_memory_anon_bytes` - RSS share for memory allocated by the process itself. This share cannot be freed by the OS, so it must be taken into account by OOM killer.