Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2021-03-29 19:15:52 +03:00
commit 11ded82e60
36 changed files with 2082 additions and 285 deletions

View file

@ -358,6 +358,10 @@ It may be useful to perform `vmagent` rolling update without any scrape loss.
Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data
if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`. if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`.
* `vmagent` drops data blocks if remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses. The number of dropped blocks can be monitored via `vmagent_remotewrite_packets_dropped_total` metric exported at [/metrics page](#monitoring).
* Use `-remoteWrite.queues=1` when `-remoteWrite.url` points to remote storage, which doesn't accept out-of-order samples (aka data backfilling). Such storage systems include Prometheus, Cortex and Thanos.
* `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`. * `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`.
The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set. The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set.
If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory. If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory.

View file

@ -259,13 +259,13 @@ again:
return true return true
} }
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc() metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
if statusCode == 409 { if statusCode == 409 || statusCode == 400 {
// Just drop block on 409 status code like Prometheus does. // Just drop block on 409 status code like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
body, _ := ioutil.ReadAll(resp.Body) // drop block on 400 status code,
// not expected that remote server will be able to handle it on retry
// should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
_ = resp.Body.Close() _ = resp.Body.Close()
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
"response body=%q", len(block), c.sanitizedURL, statusCode, body)
c.packetsDropped.Inc() c.packetsDropped.Inc()
return true return true
} }

View file

@ -29,12 +29,16 @@ var (
// ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`. // ResetRollupResultCacheIfNeeded resets rollup result cache if mrs contains timestamps outside `now - search.cacheTimestampOffset`.
func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) { func ResetRollupResultCacheIfNeeded(mrs []storage.MetricRow) {
checkRollupResultCacheResetOnce.Do(func() { checkRollupResultCacheResetOnce.Do(func() {
rollupResultResetMetricRowSample.Store(&storage.MetricRow{})
go checkRollupResultCacheReset() go checkRollupResultCacheReset()
}) })
minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds() minTimestamp := int64(fasttime.UnixTimestamp()*1000) - cacheTimestampOffset.Milliseconds() + checkRollupResultCacheResetInterval.Milliseconds()
needCacheReset := false needCacheReset := false
for i := range mrs { for i := range mrs {
if mrs[i].Timestamp < minTimestamp { if mrs[i].Timestamp < minTimestamp {
var mr storage.MetricRow
mr.CopyFrom(&mrs[i])
rollupResultResetMetricRowSample.Store(&mr)
needCacheReset = true needCacheReset = true
break break
} }
@ -49,6 +53,10 @@ func checkRollupResultCacheReset() {
for { for {
time.Sleep(checkRollupResultCacheResetInterval) time.Sleep(checkRollupResultCacheResetInterval)
if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 { if atomic.SwapUint32(&needRollupResultCacheReset, 0) > 0 {
mr := rollupResultResetMetricRowSample.Load().(*storage.MetricRow)
d := int64(fasttime.UnixTimestamp()*1000) - mr.Timestamp - cacheTimestampOffset.Milliseconds()
logger.Warnf("resetting rollup result cache because the metric %s has a timestamp older than -search.cacheTimestampOffset=%s by %.3fs",
mr.String(), cacheTimestampOffset, float64(d)/1e3)
ResetRollupResultCache() ResetRollupResultCache()
} }
} }
@ -58,6 +66,7 @@ const checkRollupResultCacheResetInterval = 5 * time.Second
var needRollupResultCacheReset uint32 var needRollupResultCacheReset uint32
var checkRollupResultCacheResetOnce sync.Once var checkRollupResultCacheResetOnce sync.Once
var rollupResultResetMetricRowSample atomic.Value
var rollupResultCacheV = &rollupResultCache{ var rollupResultCacheV = &rollupResultCache{
c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing. c: workingsetcache.New(1024*1024, time.Hour), // This is a cache for testing.

View file

@ -45,12 +45,12 @@
} }
] ]
}, },
"description": "Overview for single node VictoriaMetrics v1.55.1 or higher", "description": "Overview for single node VictoriaMetrics v1.56.0 or higher",
"editable": true, "editable": true,
"gnetId": 10229, "gnetId": 10229,
"graphTooltip": 0, "graphTooltip": 0,
"id": null, "id": null,
"iteration": 1615713966732, "iteration": 1616956884194,
"links": [ "links": [
{ {
"icon": "doc", "icon": "doc",
@ -2654,7 +2654,7 @@
"dashLength": 10, "dashLength": 10,
"dashes": false, "dashes": false,
"datasource": "$ds", "datasource": "$ds",
"description": "Shows how many of new time-series are created every second. High churn rate tightly connected with database performance and may result in unexpected OOM's or slow queries. It is recommended to always keep an eye on this metric to avoid unexpected cardinality \"explosions\".\n\nGood references to read:\n* https://www.robustperception.io/cardinality-is-key\n* https://www.robustperception.io/using-tsdb-analyze-to-investigate-churn-and-cardinality", "description": "Shows the rate and total number of new series created over last 24h.\n\nHigh churn rate tightly connected with database performance and may result in unexpected OOM's or slow queries. It is recommended to always keep an eye on this metric to avoid unexpected cardinality \"explosions\".\n\nThe higher chur rate is, the more resources required to handle it. Consider to keep the churn rate as low as possible.\n\nGood references to read:\n* https://www.robustperception.io/cardinality-is-key\n* https://www.robustperception.io/using-tsdb-analyze-to-investigate-churn-and-cardinality",
"fieldConfig": { "fieldConfig": {
"defaults": { "defaults": {
"custom": {}, "custom": {},
@ -2668,7 +2668,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 85 "y": 32
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 66, "id": 66,
@ -2689,15 +2689,27 @@
"pointradius": 2, "pointradius": 2,
"points": false, "points": false,
"renderer": "flot", "renderer": "flot",
"seriesOverrides": [], "seriesOverrides": [
{
"alias": "new series over 24h",
"yaxis": 2
}
],
"spaceLength": 10, "spaceLength": 10,
"stack": false, "stack": false,
"steppedLine": false, "steppedLine": false,
"targets": [ "targets": [
{ {
"expr": "sum(rate(vm_new_timeseries_created_total{job=\"$job\", instance=\"$instance\"}[5m]))", "expr": "sum(rate(vm_new_timeseries_created_total{job=\"$job\", instance=\"$instance\"}[5m]))",
"interval": "",
"legendFormat": "churn rate", "legendFormat": "churn rate",
"refId": "A" "refId": "A"
},
{
"expr": "sum(increase(vm_new_timeseries_created_total{job=\"$job\", instance=\"$instance\"}[24h]))",
"interval": "",
"legendFormat": "new series over 24h",
"refId": "B"
} }
], ],
"thresholds": [], "thresholds": [],
@ -2761,7 +2773,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 12, "x": 12,
"y": 85 "y": 32
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 60, "id": 60,
@ -2859,7 +2871,7 @@
"h": 9, "h": 9,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 93 "y": 40
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 68, "id": 68,
@ -2958,7 +2970,7 @@
"h": 9, "h": 9,
"w": 12, "w": 12,
"x": 12, "x": 12,
"y": 93 "y": 40
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 74, "id": 74,

View file

@ -51,12 +51,12 @@
} }
] ]
}, },
"description": "Overview for VictoriaMetrics vmagent v1.40.0 or higher", "description": "Overview for VictoriaMetrics vmagent v1.56.0 or higher",
"editable": true, "editable": true,
"gnetId": null, "gnetId": null,
"graphTooltip": 1, "graphTooltip": 1,
"id": null, "id": null,
"iteration": 1598997251171, "iteration": 1616957263139,
"links": [ "links": [
{ {
"icon": "doc", "icon": "doc",
@ -1283,6 +1283,101 @@
"alignLevel": null "alignLevel": null
} }
}, },
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"description": "Shows the rate of dropped data blocks in cases when remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses.\n\nSee https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149",
"fieldConfig": {
"defaults": {
"custom": {},
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 32
},
"hiddenSeries": false,
"id": 79,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": true,
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(vmagent_remotewrite_packets_dropped_total{job=~\"$job\", instance=~\"$instance\"}[$__interval]))",
"interval": "",
"legendFormat": "",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Data blocks dropped ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "bytes",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{ {
"collapsed": true, "collapsed": true,
"datasource": "$ds", "datasource": "$ds",
@ -3105,6 +3200,7 @@
"dashLength": 10, "dashLength": 10,
"dashes": false, "dashes": false,
"datasource": "$ds", "datasource": "$ds",
"description": "Panel shows the number of open file descriptors in the OS.\nReaching the limit of open files can cause various issues and must be prevented.\n\nSee how to change limits here https://medium.com/@muhammadtriwibowo/set-permanently-ulimit-n-open-files-in-ubuntu-4d61064429a",
"fieldConfig": { "fieldConfig": {
"defaults": { "defaults": {
"custom": {}, "custom": {},
@ -3121,6 +3217,326 @@
"y": 13 "y": 13
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 83,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "max",
"color": "#C4162A"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(process_open_fds{job=~\"$job\", instance=~\"$instance\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "open",
"refId": "A"
},
{
"expr": "min(process_max_fds{job=~\"$job\", instance=~\"$instance\"})",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "max",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Open FDs ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": null,
"logBase": 2,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"fieldConfig": {
"defaults": {
"custom": {},
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 13
},
"hiddenSeries": false,
"id": 39,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(go_goroutines{job=~\"$job\", instance=~\"$instance\"}) by(instance)",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Goroutines ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"description": "Shows the number of bytes read/write from the storage layer when vmagent has to buffer data on disk or read already buffered data.",
"fieldConfig": {
"defaults": {
"custom": {},
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 21
},
"hiddenSeries": false,
"id": 81,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "read",
"transform": "negative-Y"
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(rate(process_io_storage_read_bytes_total{job=~\"$job\", instance=~\"$instance\"}[5m]))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "read",
"refId": "A"
},
{
"expr": "sum(rate(process_io_storage_written_bytes_total{job=~\"$job\", instance=~\"$instance\"}[5m]))",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 1,
"legendFormat": "write",
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Disk writes/reads ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": null,
"format": "bytes",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"fieldConfig": {
"defaults": {
"custom": {},
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 21
},
"hiddenSeries": false,
"id": 41, "id": 41,
"legend": { "legend": {
"avg": false, "avg": false,
@ -3195,102 +3611,6 @@
"alignLevel": null "alignLevel": null
} }
}, },
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "$ds",
"fieldConfig": {
"defaults": {
"custom": {},
"links": []
},
"overrides": []
},
"fill": 1,
"fillGradient": 0,
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 13
},
"hiddenSeries": false,
"id": 39,
"legend": {
"avg": false,
"current": false,
"max": false,
"min": false,
"show": false,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pluginVersion": "7.1.1",
"pointradius": 2,
"points": false,
"renderer": "flot",
"seriesOverrides": [],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "sum(go_goroutines{job=~\"$job\", instance=~\"$instance\"}) by(instance)",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
"refId": "A"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "Goroutines ($instance)",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"decimals": 0,
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": "0",
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
],
"yaxis": {
"align": false,
"alignLevel": null
}
},
{ {
"aliasColors": {}, "aliasColors": {},
"bars": false, "bars": false,
@ -3310,7 +3630,7 @@
"h": 8, "h": 8,
"w": 12, "w": 12,
"x": 0, "x": 0,
"y": 21 "y": 29
}, },
"hiddenSeries": false, "hiddenSeries": false,
"id": 43, "id": 43,

View file

@ -106,6 +106,23 @@ groups:
High Churn Rate tightly connected with database performance and may High Churn Rate tightly connected with database performance and may
result in unexpected OOM's or slow queries." result in unexpected OOM's or slow queries."
- alert: TooHighChurnRate24h
expr: |
sum(increase(vm_new_timeseries_created_total[24h])) by(instance)
>
(sum(vm_cache_entries{type="storage/hour_metric_ids"}) by(instance) * 3)
for: 15m
labels:
severity: warning
annotations:
dashboard: "http://localhost:3000/d/wNf0q_kZk?viewPanel=66&var-instance={{ $labels.instance }}"
summary: "Too high number of new series on \"{{ $labels.instance }}\" created over last 24h"
description: "The number of created new time series over last 24h is 3x times higher than
current number of active series on \"{{ $labels.instance }}\".\n
This effect is known as Churn Rate.\n
High Churn Rate tightly connected with database performance and may
result in unexpected OOM's or slow queries."
- alert: TooHighSlowInsertsRate - alert: TooHighSlowInsertsRate
expr: | expr: |
( (

View file

@ -71,6 +71,7 @@ services:
# display source of alerts in grafana # display source of alerts in grafana
- '-external.url=http://127.0.0.1:3000' #grafana outside container - '-external.url=http://127.0.0.1:3000' #grafana outside container
- '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr":"{{$$expr|quotesEscape|crlfEscape|queryEscape}}"},{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]' ## when copypaste the line be aware of '$$' for escaping in '$expr' networks: - '--external.alert.source=explore?orgId=1&left=["now-1h","now","VictoriaMetrics",{"expr":"{{$$expr|quotesEscape|crlfEscape|queryEscape}}"},{"mode":"Metrics"},{"ui":[true,true,true,"none"]}]' ## when copypaste the line be aware of '$$' for escaping in '$expr' networks:
networks:
- vm_net - vm_net
restart: always restart: always
alertmanager: alertmanager:

View file

@ -2,6 +2,10 @@
# tip # tip
# [v1.57.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.57.0)
* FEATURE: optimize query performance by up to 10x on systems with many CPU cores. See [this tweet](https://twitter.com/MetricsVictoria/status/1375064484860067840).
* FEATURE: add the following metrics at `/metrics` page for every VictoraMetrics app: * FEATURE: add the following metrics at `/metrics` page for every VictoraMetrics app:
* `process_resident_memory_anon_bytes` - RSS share for memory allocated by the process itself. This share cannot be freed by the OS, so it must be taken into account by OOM killer. * `process_resident_memory_anon_bytes` - RSS share for memory allocated by the process itself. This share cannot be freed by the OS, so it must be taken into account by OOM killer.
* `process_resident_memory_file_bytes` - RSS share for page cache memory (aka memory-mapped files). This share can be freed by the OS at any time, so it must be ignored by OOM killer. * `process_resident_memory_file_bytes` - RSS share for page cache memory (aka memory-mapped files). This share can be freed by the OS at any time, so it must be ignored by OOM killer.
@ -9,12 +13,17 @@
* `process_resident_memory_peak_bytes` - peak RSS usage for the process. * `process_resident_memory_peak_bytes` - peak RSS usage for the process.
* `process_virtual_memory_peak_bytes` - peak virtual memory usage for the process. * `process_virtual_memory_peak_bytes` - peak virtual memory usage for the process.
* FEATURE: accept and enforce `extra_label=<label_name>=<label_value>` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage). * FEATURE: accept and enforce `extra_label=<label_name>=<label_value>` query arg at [Graphite APIs](https://victoriametrics.github.io/#graphite-api-usage).
* FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139 * FEATURE: use Influx field as metric name if measurement is empty and `-influxSkipSingleField` command-line is set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1139).
* FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144). * FEATURE: vmagent: add `-promscrape.consul.waitTime` command-line flag for tuning the maximum wait time for Consul service discovery. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1144).
* FEATURE: vmagent: add `vm_promscrape_discovery_kubernetes_stale_resource_versions_total` metric for monitoring the frequency of `too old resource version` errors during Kubernetes service discovery.
* FEATURE: single-node VictoriaMetrics: log metrics with timestamps older than `-search.cacheTimestampOffset` compared to the current time. See [these docs](https://victoriametrics.github.io/#backfilling) for details.
* BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars. * BUGFIX: prevent from infinite loop on `{__graphite__="..."}` filters when a metric name contains `*`, `{` or `[` chars.
* BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars. * BUGFIX: prevent from infinite loop in `/metrics/find` and `/metrics/expand` [Graphite Metrics API handlers](https://victoriametrics.github.io/#graphite-metrics-api-usage) when they match metric names or labels with `*`, `{` or `[` chars.
* BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141 * BUGFIX: do not merge duplicate time series during requests to `/api/v1/query`. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1141
* BUGFIX: vmagent: properly handle `too old resource version` error messages from Kubernetes watch API. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1150
* BUGFIX: vmagent: do not retry sending data blocks if remote storage returns `400 Bad Request` error. The number of dropped blocks due to such errors can be monitored with `vmagent_remotewrite_packets_dropped_total` metrics. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1149
* BUGFIX: properly calculate `summarize` and `*Series` functions in [Graphite Render API](https://victoriametrics.github.io/#graphite-render-api-usage).
# [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0) # [v1.56.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.56.0)

View file

@ -272,6 +272,8 @@ the update process. See [cluster availability](#cluster-availability) section fo
- `vminsert` re-routes incoming data from unavailable `vmstorage` nodes to healthy `vmstorage` nodes - `vminsert` re-routes incoming data from unavailable `vmstorage` nodes to healthy `vmstorage` nodes
- `vmselect` continues serving partial responses if at least a single `vmstorage` node is available. If consistency over availability is preferred, then either pass `-search.denyPartialResponse` command-line flag to `vmselect` or pass `deny_partial_response=1` query arg in requests to `vmselect`. - `vmselect` continues serving partial responses if at least a single `vmstorage` node is available. If consistency over availability is preferred, then either pass `-search.denyPartialResponse` command-line flag to `vmselect` or pass `deny_partial_response=1` query arg in requests to `vmselect`.
`vmselect` doesn't serve partial responses for API handlers returning raw datapoints - [`/api/v1/export*` endpoints](https://victoriametrics.github.io/#how-to-export-time-series), since users usually expect this data is always complete.
Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details. Data replication can be used for increasing storage durability. See [these docs](#replication-and-data-safety) for details.

View file

@ -358,6 +358,10 @@ It may be useful to perform `vmagent` rolling update without any scrape loss.
Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data Such gaps may appear because `vmagent` cannot keep up with sending the collected data to remote storage. Therefore it starts dropping the buffered data
if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`. if the on-disk buffer size exceeds `-remoteWrite.maxDiskUsagePerURL`.
* `vmagent` drops data blocks if remote storage replies with `400 Bad Request` and `409 Conflict` HTTP responses. The number of dropped blocks can be monitored via `vmagent_remotewrite_packets_dropped_total` metric exported at [/metrics page](#monitoring).
* Use `-remoteWrite.queues=1` when `-remoteWrite.url` points to remote storage, which doesn't accept out-of-order samples (aka data backfilling). Such storage systems include Prometheus, Cortex and Thanos.
* `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`. * `vmagent` buffers scraped data at the `-remoteWrite.tmpDataPath` directory until it is sent to `-remoteWrite.url`.
The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set. The directory can grow large when remote storage is unavailable for extended periods of time and if `-remoteWrite.maxDiskUsagePerURL` isn't set.
If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory. If you don't want to send all the data from the directory to remote storage then simply stop `vmagent` and delete the directory.

2
go.mod
View file

@ -18,7 +18,7 @@ require (
github.com/go-kit/kit v0.10.0 github.com/go-kit/kit v0.10.0
github.com/golang/snappy v0.0.3 github.com/golang/snappy v0.0.3
github.com/influxdata/influxdb v1.8.4 github.com/influxdata/influxdb v1.8.4
github.com/klauspost/compress v1.11.12 github.com/klauspost/compress v1.11.13
github.com/mattn/go-runewidth v0.0.10 // indirect github.com/mattn/go-runewidth v0.0.10 // indirect
github.com/oklog/ulid v1.3.1 github.com/oklog/ulid v1.3.1
github.com/prometheus/client_golang v1.10.0 // indirect github.com/prometheus/client_golang v1.10.0 // indirect

3
go.sum
View file

@ -514,8 +514,9 @@ github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.12 h1:famVnQVu7QwryBN4jNseQdUKES71ZAOnB6UQQJPZvqk=
github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.12/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.13 h1:eSvu8Tmq6j2psUJqJrLcWH6K3w5Dwc+qipbaA6eVEN4=
github.com/klauspost/compress v1.11.13/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=

View file

@ -308,10 +308,11 @@ type urlWatcher struct {
resourceVersion string resourceVersion string
objectsCount *metrics.Counter objectsCount *metrics.Counter
objectsAdded *metrics.Counter objectsAdded *metrics.Counter
objectsRemoved *metrics.Counter objectsRemoved *metrics.Counter
objectsUpdated *metrics.Counter objectsUpdated *metrics.Counter
staleResourceVersions *metrics.Counter
} }
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher { func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
@ -329,10 +330,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
awsPending: make(map[*apiWatcher]struct{}), awsPending: make(map[*apiWatcher]struct{}),
objectsByKey: make(map[string]object), objectsByKey: make(map[string]object),
objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)), objectsCount: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects{role=%q}`, role)),
objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)), objectsAdded: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_added_total{role=%q}`, role)),
objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)), objectsRemoved: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_removed_total{role=%q}`, role)),
objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)), objectsUpdated: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_objects_updated_total{role=%q}`, role)),
staleResourceVersions: metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_stale_resource_versions_total{role=%q}`, role)),
} }
logger.Infof("started %s watcher for %q", uw.role, uw.apiURL) logger.Infof("started %s watcher for %q", uw.role, uw.apiURL)
go uw.watchForUpdates() go uw.watchForUpdates()
@ -502,14 +504,15 @@ func (uw *urlWatcher) watchForUpdates() {
continue continue
} }
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
if resp.StatusCode == 410 { if resp.StatusCode == 410 {
// There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses // There is no need for sleep on 410 error. See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
backoffDelay = time.Second backoffDelay = time.Second
uw.staleResourceVersions.Inc()
uw.setResourceVersion("") uw.setResourceVersion("")
} else { } else {
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code for request to %q: %d; want %d; response: %q", requestURL, resp.StatusCode, http.StatusOK, body)
backoffSleep() backoffSleep()
} }
continue continue
@ -580,13 +583,25 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err) return fmt.Errorf("cannot parse bookmark from %q: %w", we.Object, err)
} }
uw.setResourceVersion(bm.Metadata.ResourceVersion) uw.setResourceVersion(bm.Metadata.ResourceVersion)
case "ERROR":
em, err := parseError(we.Object)
if err != nil {
return fmt.Errorf("cannot parse error message from %q: %w", we.Object, err)
}
if em.Code == 410 {
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#410-gone-responses
uw.staleResourceVersions.Inc()
uw.setResourceVersion("")
return nil
}
return fmt.Errorf("unexpected error message: %q", we.Object)
default: default:
return fmt.Errorf("unexpected WatchEvent type %q for role %q", we.Type, uw.role) return fmt.Errorf("unexpected WatchEvent type %q: %q", we.Type, we.Object)
} }
} }
} }
// Bookmark is a bookmark from Kubernetes Watch API. // Bookmark is a bookmark message from Kubernetes Watch API.
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks // See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
type Bookmark struct { type Bookmark struct {
Metadata struct { Metadata struct {
@ -602,6 +617,19 @@ func parseBookmark(data []byte) (*Bookmark, error) {
return &bm, nil return &bm, nil
} }
// Error is an error message from Kubernetes Watch API.
type Error struct {
Code int
}
func parseError(data []byte) (*Error, error) {
var em Error
if err := json.Unmarshal(data, &em); err != nil {
return nil, err
}
return &em, nil
}
func getAPIPaths(role string, namespaces []string, selectors []Selector) []string { func getAPIPaths(role string, namespaces []string, selectors []Selector) []string {
objectName := getObjectNameByRole(role) objectName := getObjectNameByRole(role)
if objectName == "nodes" || len(namespaces) == 0 { if objectName == "nodes" || len(namespaces) == 0 {

View file

@ -179,11 +179,14 @@ func mergeBlocks(ob, ib1, ib2 *Block, retentionDeadline int64, rowsDeleted *uint
func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) { func skipSamplesOutsideRetention(b *Block, retentionDeadline int64, rowsDeleted *uint64) {
timestamps := b.timestamps timestamps := b.timestamps
nextIdx := b.nextIdx nextIdx := b.nextIdx
nextIdxOrig := nextIdx
for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline { for nextIdx < len(timestamps) && timestamps[nextIdx] < retentionDeadline {
nextIdx++ nextIdx++
} }
atomic.AddUint64(rowsDeleted, uint64(nextIdx-b.nextIdx)) if n := nextIdx - nextIdxOrig; n > 0 {
b.nextIdx = nextIdx atomic.AddUint64(rowsDeleted, uint64(n))
b.nextIdx = nextIdx
}
} }
func appendRows(ob, ib *Block) { func appendRows(ob, ib *Block) {

View file

@ -1272,7 +1272,7 @@ func (mr *MetricRow) String() string {
if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil { if err := mn.unmarshalRaw(mr.MetricNameRaw); err == nil {
metricName = mn.String() metricName = mn.String()
} }
return fmt.Sprintf("MetricName=%s, Timestamp=%d, Value=%f\n", metricName, mr.Timestamp, mr.Value) return fmt.Sprintf("%s (Timestamp=%d, Value=%f)", metricName, mr.Timestamp, mr.Value)
} }
// Marshal appends marshaled mr to dst and returns the result. // Marshal appends marshaled mr to dst and returns the result.

View file

@ -186,19 +186,6 @@ func (s *Set) Has(x uint64) bool {
hi32 := uint32(x >> 32) hi32 := uint32(x >> 32)
lo32 := uint32(x) lo32 := uint32(x)
bs := s.buckets bs := s.buckets
if len(bs) > 0 && bs[0].hi == hi32 {
// Manually inline bucket32.has for performance reasons.
hi16 := uint16(lo32 >> 16)
lo16 := uint16(lo32)
b32 := &bs[0]
his := b32.b16his
if n := b32.getHint(); n < uint32(len(his)) && his[n] == hi16 {
// Fast path - check the previously used bucket.
bs := b32.buckets
return n < uint32(len(bs)) && bs[n].has(lo16)
}
return b32.hasSlow(hi16, lo16)
}
for i := range bs { for i := range bs {
b32 := &bs[i] b32 := &bs[i]
if b32.hi == hi32 { if b32.hi == hi32 {
@ -671,22 +658,13 @@ func (b *bucket32) addBucketAtPos(hi uint16, pos int) *bucket16 {
func (b *bucket32) has(x uint32) bool { func (b *bucket32) has(x uint32) bool {
hi := uint16(x >> 16) hi := uint16(x >> 16)
lo := uint16(x) lo := uint16(x)
his := b.b16his
if n := b.getHint(); n < uint32(len(his)) && his[n] == hi {
// Fast path - check the previously used bucket.
bs := b.buckets
return n < uint32(len(bs)) && bs[n].has(lo)
}
return b.hasSlow(hi, lo)
}
func (b *bucket32) hasSlow(hi, lo uint16) bool {
his := b.b16his his := b.b16his
n := binarySearch16(his, hi) n := binarySearch16(his, hi)
if n < 0 || n >= len(his) || his[n] != hi { if n < 0 || n >= len(his) || his[n] != hi {
return false return false
} }
b.setHint(n) // Do not call b.setHint(n) here, since this may trash performance
// when many concurrent goroutines call b.has() method from many CPU cores.
bs := b.buckets bs := b.buckets
return n < len(bs) && bs[n].has(lo) return n < len(bs) && bs[n].has(lo)
} }

View file

@ -645,15 +645,15 @@ func (d *compressor) init(w io.Writer, level int) (err error) {
d.fill = (*compressor).fillBlock d.fill = (*compressor).fillBlock
d.step = (*compressor).store d.step = (*compressor).store
case level == ConstantCompression: case level == ConstantCompression:
d.w.logNewTablePenalty = 4 d.w.logNewTablePenalty = 8
d.window = make([]byte, maxStoreBlockSize) d.window = make([]byte, 32<<10)
d.fill = (*compressor).fillBlock d.fill = (*compressor).fillBlock
d.step = (*compressor).storeHuff d.step = (*compressor).storeHuff
case level == DefaultCompression: case level == DefaultCompression:
level = 5 level = 5
fallthrough fallthrough
case level >= 1 && level <= 6: case level >= 1 && level <= 6:
d.w.logNewTablePenalty = 6 d.w.logNewTablePenalty = 8
d.fast = newFastEnc(level) d.fast = newFastEnc(level)
d.window = make([]byte, maxStoreBlockSize) d.window = make([]byte, maxStoreBlockSize)
d.fill = (*compressor).fillBlock d.fill = (*compressor).fillBlock

View file

@ -6,6 +6,7 @@
package flate package flate
import ( import (
"encoding/binary"
"fmt" "fmt"
"math/bits" "math/bits"
) )
@ -65,26 +66,15 @@ func load32(b []byte, i int) uint32 {
} }
func load64(b []byte, i int) uint64 { func load64(b []byte, i int) uint64 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read. return binary.LittleEndian.Uint64(b[i:])
b = b[i:]
b = b[:8]
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 |
uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56
} }
func load3232(b []byte, i int32) uint32 { func load3232(b []byte, i int32) uint32 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read. return binary.LittleEndian.Uint32(b[i:])
b = b[i:]
b = b[:4]
return uint32(b[0]) | uint32(b[1])<<8 | uint32(b[2])<<16 | uint32(b[3])<<24
} }
func load6432(b []byte, i int32) uint64 { func load6432(b []byte, i int32) uint64 {
// Help the compiler eliminate bounds checks on the read so it can be done in a single read. return binary.LittleEndian.Uint64(b[i:])
b = b[i:]
b = b[:8]
return uint64(b[0]) | uint64(b[1])<<8 | uint64(b[2])<<16 | uint64(b[3])<<24 |
uint64(b[4])<<32 | uint64(b[5])<<40 | uint64(b[6])<<48 | uint64(b[7])<<56
} }
func hash(u uint32) uint32 { func hash(u uint32) uint32 {
@ -225,9 +215,9 @@ func (e *fastGen) Reset() {
func matchLen(a, b []byte) int { func matchLen(a, b []byte) int {
b = b[:len(a)] b = b[:len(a)]
var checked int var checked int
if len(a) > 4 { if len(a) >= 4 {
// Try 4 bytes first // Try 4 bytes first
if diff := load32(a, 0) ^ load32(b, 0); diff != 0 { if diff := binary.LittleEndian.Uint32(a) ^ binary.LittleEndian.Uint32(b); diff != 0 {
return bits.TrailingZeros32(diff) >> 3 return bits.TrailingZeros32(diff) >> 3
} }
// Switch to 8 byte matching. // Switch to 8 byte matching.
@ -236,7 +226,7 @@ func matchLen(a, b []byte) int {
b = b[4:] b = b[4:]
for len(a) >= 8 { for len(a) >= 8 {
b = b[:len(a)] b = b[:len(a)]
if diff := load64(a, 0) ^ load64(b, 0); diff != 0 { if diff := binary.LittleEndian.Uint64(a) ^ binary.LittleEndian.Uint64(b); diff != 0 {
return checked + (bits.TrailingZeros64(diff) >> 3) return checked + (bits.TrailingZeros64(diff) >> 3)
} }
checked += 8 checked += 8
@ -247,7 +237,7 @@ func matchLen(a, b []byte) int {
b = b[:len(a)] b = b[:len(a)]
for i := range a { for i := range a {
if a[i] != b[i] { if a[i] != b[i] {
return int(i) + checked return i + checked
} }
} }
return len(a) + checked return len(a) + checked

View file

@ -5,6 +5,7 @@
package flate package flate
import ( import (
"encoding/binary"
"io" "io"
) )
@ -206,7 +207,7 @@ func (w *huffmanBitWriter) write(b []byte) {
} }
func (w *huffmanBitWriter) writeBits(b int32, nb uint16) { func (w *huffmanBitWriter) writeBits(b int32, nb uint16) {
w.bits |= uint64(b) << (w.nbits & reg16SizeMask64) w.bits |= uint64(b) << w.nbits
w.nbits += nb w.nbits += nb
if w.nbits >= 48 { if w.nbits >= 48 {
w.writeOutBits() w.writeOutBits()
@ -420,13 +421,11 @@ func (w *huffmanBitWriter) writeOutBits() {
w.bits >>= 48 w.bits >>= 48
w.nbits -= 48 w.nbits -= 48
n := w.nbytes n := w.nbytes
w.bytes[n] = byte(bits)
w.bytes[n+1] = byte(bits >> 8) // We over-write, but faster...
w.bytes[n+2] = byte(bits >> 16) binary.LittleEndian.PutUint64(w.bytes[n:], bits)
w.bytes[n+3] = byte(bits >> 24)
w.bytes[n+4] = byte(bits >> 32)
w.bytes[n+5] = byte(bits >> 40)
n += 6 n += 6
if n >= bufferFlushSize { if n >= bufferFlushSize {
if w.err != nil { if w.err != nil {
n = 0 n = 0
@ -435,6 +434,7 @@ func (w *huffmanBitWriter) writeOutBits() {
w.write(w.bytes[:n]) w.write(w.bytes[:n])
n = 0 n = 0
} }
w.nbytes = n w.nbytes = n
} }
@ -759,7 +759,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode)
} else { } else {
// inlined // inlined
c := lengths[lengthCode&31] c := lengths[lengthCode&31]
w.bits |= uint64(c.code) << (w.nbits & reg16SizeMask64) w.bits |= uint64(c.code) << w.nbits
w.nbits += c.len w.nbits += c.len
if w.nbits >= 48 { if w.nbits >= 48 {
w.writeOutBits() w.writeOutBits()
@ -779,7 +779,7 @@ func (w *huffmanBitWriter) writeTokens(tokens []token, leCodes, oeCodes []hcode)
} else { } else {
// inlined // inlined
c := offs[offsetCode&31] c := offs[offsetCode&31]
w.bits |= uint64(c.code) << (w.nbits & reg16SizeMask64) w.bits |= uint64(c.code) << w.nbits
w.nbits += c.len w.nbits += c.len
if w.nbits >= 48 { if w.nbits >= 48 {
w.writeOutBits() w.writeOutBits()
@ -830,8 +830,8 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) {
// Assume header is around 70 bytes: // Assume header is around 70 bytes:
// https://stackoverflow.com/a/25454430 // https://stackoverflow.com/a/25454430
const guessHeaderSizeBits = 70 * 8 const guessHeaderSizeBits = 70 * 8
estBits, estExtra := histogramSize(input, w.literalFreq[:], !eof && !sync) estBits := histogramSize(input, w.literalFreq[:], !eof && !sync)
estBits += w.lastHeader + 15 estBits += w.lastHeader + len(input)/32
if w.lastHeader == 0 { if w.lastHeader == 0 {
estBits += guessHeaderSizeBits estBits += guessHeaderSizeBits
} }
@ -845,9 +845,9 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) {
return return
} }
reuseSize := 0
if w.lastHeader > 0 { if w.lastHeader > 0 {
reuseSize := w.literalEncoding.bitLength(w.literalFreq[:256]) reuseSize = w.literalEncoding.bitLength(w.literalFreq[:256])
estBits += estExtra
if estBits < reuseSize { if estBits < reuseSize {
// We owe an EOB // We owe an EOB
@ -859,6 +859,10 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) {
const numLiterals = endBlockMarker + 1 const numLiterals = endBlockMarker + 1
const numOffsets = 1 const numOffsets = 1
if w.lastHeader == 0 { if w.lastHeader == 0 {
if !eof && !sync {
// Generate a slightly suboptimal tree that can be used for all.
fillHist(w.literalFreq[:numLiterals])
}
w.literalFreq[endBlockMarker] = 1 w.literalFreq[endBlockMarker] = 1
w.literalEncoding.generate(w.literalFreq[:numLiterals], 15) w.literalEncoding.generate(w.literalFreq[:numLiterals], 15)
@ -878,19 +882,14 @@ func (w *huffmanBitWriter) writeBlockHuff(eof bool, input []byte, sync bool) {
for _, t := range input { for _, t := range input {
// Bitwriting inlined, ~30% speedup // Bitwriting inlined, ~30% speedup
c := encoding[t] c := encoding[t]
w.bits |= uint64(c.code) << ((w.nbits) & reg16SizeMask64) w.bits |= uint64(c.code) << w.nbits
w.nbits += c.len w.nbits += c.len
if w.nbits >= 48 { if w.nbits >= 48 {
bits := w.bits bits := w.bits
w.bits >>= 48 w.bits >>= 48
w.nbits -= 48 w.nbits -= 48
n := w.nbytes n := w.nbytes
w.bytes[n] = byte(bits) binary.LittleEndian.PutUint64(w.bytes[n:], bits)
w.bytes[n+1] = byte(bits >> 8)
w.bytes[n+2] = byte(bits >> 16)
w.bytes[n+3] = byte(bits >> 24)
w.bytes[n+4] = byte(bits >> 32)
w.bytes[n+5] = byte(bits >> 40)
n += 6 n += 6
if n >= bufferFlushSize { if n >= bufferFlushSize {
if w.err != nil { if w.err != nil {

View file

@ -122,6 +122,16 @@ func (h *huffmanEncoder) bitLength(freq []uint16) int {
return total return total
} }
func (h *huffmanEncoder) bitLengthRaw(b []byte) int {
var total int
for _, f := range b {
if f != 0 {
total += int(h.codes[f].len)
}
}
return total
}
// Return the number of literals assigned to each bit size in the Huffman encoding // Return the number of literals assigned to each bit size in the Huffman encoding
// //
// This method is only called when list.length >= 3 // This method is only called when list.length >= 3
@ -327,37 +337,40 @@ func atLeastOne(v float32) float32 {
return v return v
} }
// Unassigned values are assigned '1' in the histogram.
func fillHist(b []uint16) {
for i, v := range b {
if v == 0 {
b[i] = 1
}
}
}
// histogramSize accumulates a histogram of b in h. // histogramSize accumulates a histogram of b in h.
// An estimated size in bits is returned. // An estimated size in bits is returned.
// Unassigned values are assigned '1' in the histogram.
// len(h) must be >= 256, and h's elements must be all zeroes. // len(h) must be >= 256, and h's elements must be all zeroes.
func histogramSize(b []byte, h []uint16, fill bool) (int, int) { func histogramSize(b []byte, h []uint16, fill bool) (bits int) {
h = h[:256] h = h[:256]
for _, t := range b { for _, t := range b {
h[t]++ h[t]++
} }
invTotal := 1.0 / float32(len(b)) total := len(b)
shannon := float32(0.0)
var extra float32
if fill { if fill {
oneBits := atLeastOne(-mFastLog2(invTotal)) for _, v := range h {
for i, v := range h[:] { if v == 0 {
if v > 0 { total++
n := float32(v)
shannon += atLeastOne(-mFastLog2(n*invTotal)) * n
} else {
h[i] = 1
extra += oneBits
}
}
} else {
for _, v := range h[:] {
if v > 0 {
n := float32(v)
shannon += atLeastOne(-mFastLog2(n*invTotal)) * n
} }
} }
} }
return int(shannon + 0.99), int(extra + 0.99) invTotal := 1.0 / float32(total)
shannon := float32(0.0)
for _, v := range h {
if v > 0 {
n := float32(v)
shannon += atLeastOne(-mFastLog2(n*invTotal)) * n
}
}
return int(shannon + 0.99)
} }

View file

@ -155,7 +155,7 @@ func (e *fastEncL2) Encode(dst *tokens, src []byte) {
// Store every second hash in-between, but offset by 1. // Store every second hash in-between, but offset by 1.
for i := s - l + 2; i < s-5; i += 7 { for i := s - l + 2; i < s-5; i += 7 {
x := load6432(src, int32(i)) x := load6432(src, i)
nextHash := hash4u(uint32(x), bTableBits) nextHash := hash4u(uint32(x), bTableBits)
e.table[nextHash] = tableEntry{offset: e.cur + i} e.table[nextHash] = tableEntry{offset: e.cur + i}
// Skip one // Skip one

View file

@ -301,7 +301,7 @@ func (s *Scratch) writeCount() error {
out[outP+1] = byte(bitStream >> 8) out[outP+1] = byte(bitStream >> 8)
outP += (bitCount + 7) / 8 outP += (bitCount + 7) / 8
if uint16(charnum) > s.symbolLen { if charnum > s.symbolLen {
return errors.New("internal error: charnum > s.symbolLen") return errors.New("internal error: charnum > s.symbolLen")
} }
s.Out = out[:outP] s.Out = out[:outP]
@ -331,7 +331,7 @@ type cTable struct {
func (s *Scratch) allocCtable() { func (s *Scratch) allocCtable() {
tableSize := 1 << s.actualTableLog tableSize := 1 << s.actualTableLog
// get tableSymbol that is big enough. // get tableSymbol that is big enough.
if cap(s.ct.tableSymbol) < int(tableSize) { if cap(s.ct.tableSymbol) < tableSize {
s.ct.tableSymbol = make([]byte, tableSize) s.ct.tableSymbol = make([]byte, tableSize)
} }
s.ct.tableSymbol = s.ct.tableSymbol[:tableSize] s.ct.tableSymbol = s.ct.tableSymbol[:tableSize]
@ -565,8 +565,8 @@ func (s *Scratch) normalizeCount2() error {
distributed uint32 distributed uint32
total = uint32(s.br.remain()) total = uint32(s.br.remain())
tableLog = s.actualTableLog tableLog = s.actualTableLog
lowThreshold = uint32(total >> tableLog) lowThreshold = total >> tableLog
lowOne = uint32((total * 3) >> (tableLog + 1)) lowOne = (total * 3) >> (tableLog + 1)
) )
for i, cnt := range s.count[:s.symbolLen] { for i, cnt := range s.count[:s.symbolLen] {
if cnt == 0 { if cnt == 0 {
@ -591,7 +591,7 @@ func (s *Scratch) normalizeCount2() error {
if (total / toDistribute) > lowOne { if (total / toDistribute) > lowOne {
// risk of rounding to zero // risk of rounding to zero
lowOne = uint32((total * 3) / (toDistribute * 2)) lowOne = (total * 3) / (toDistribute * 2)
for i, cnt := range s.count[:s.symbolLen] { for i, cnt := range s.count[:s.symbolLen] {
if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) { if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) {
s.norm[i] = 1 s.norm[i] = 1

View file

@ -172,7 +172,7 @@ type decSymbol struct {
// allocDtable will allocate decoding tables if they are not big enough. // allocDtable will allocate decoding tables if they are not big enough.
func (s *Scratch) allocDtable() { func (s *Scratch) allocDtable() {
tableSize := 1 << s.actualTableLog tableSize := 1 << s.actualTableLog
if cap(s.decTable) < int(tableSize) { if cap(s.decTable) < tableSize {
s.decTable = make([]decSymbol, tableSize) s.decTable = make([]decSymbol, tableSize)
} }
s.decTable = s.decTable[:tableSize] s.decTable = s.decTable[:tableSize]
@ -340,7 +340,7 @@ type decoder struct {
func (d *decoder) init(in *bitReader, dt []decSymbol, tableLog uint8) { func (d *decoder) init(in *bitReader, dt []decSymbol, tableLog uint8) {
d.dt = dt d.dt = dt
d.br = in d.br = in
d.state = uint16(in.getBits(tableLog)) d.state = in.getBits(tableLog)
} }
// next returns the next symbol and sets the next state. // next returns the next symbol and sets the next state.

View file

@ -403,7 +403,7 @@ func (s *Scratch) buildCTable() error {
var startNode = int16(s.symbolLen) var startNode = int16(s.symbolLen)
nonNullRank := s.symbolLen - 1 nonNullRank := s.symbolLen - 1
nodeNb := int16(startNode) nodeNb := startNode
huffNode := s.nodes[1 : huffNodesLen+1] huffNode := s.nodes[1 : huffNodesLen+1]
// This overlays the slice above, but allows "-1" index lookups. // This overlays the slice above, but allows "-1" index lookups.
@ -580,7 +580,7 @@ func (s *Scratch) setMaxHeight(lastNonNull int) uint8 {
// Get pos of last (smallest) symbol per rank // Get pos of last (smallest) symbol per rank
{ {
currentNbBits := uint8(maxNbBits) currentNbBits := maxNbBits
for pos := int(n); pos >= 0; pos-- { for pos := int(n); pos >= 0; pos-- {
if huffNode[pos].nbBits >= currentNbBits { if huffNode[pos].nbBits >= currentNbBits {
continue continue

View file

@ -94,5 +94,5 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli)
// https://github.com/google/snappy/blob/master/framing_format.txt // https://github.com/google/snappy/blob/master/framing_format.txt
func crc(b []byte) uint32 { func crc(b []byte) uint32 {
c := crc32.Update(0, crcTable, b) c := crc32.Update(0, crcTable, b)
return uint32(c>>15|c<<17) + 0xa282ead8 return c>>15 | c<<17 + 0xa282ead8
} }

View file

@ -22,28 +22,44 @@ type blockEnc struct {
dictLitEnc *huff0.Scratch dictLitEnc *huff0.Scratch
wr bitWriter wr bitWriter
extraLits int extraLits int
last bool
output []byte output []byte
recentOffsets [3]uint32 recentOffsets [3]uint32
prevRecentOffsets [3]uint32 prevRecentOffsets [3]uint32
last bool
lowMem bool
} }
// init should be used once the block has been created. // init should be used once the block has been created.
// If called more than once, the effect is the same as calling reset. // If called more than once, the effect is the same as calling reset.
func (b *blockEnc) init() { func (b *blockEnc) init() {
if cap(b.literals) < maxCompressedLiteralSize { if b.lowMem {
b.literals = make([]byte, 0, maxCompressedLiteralSize) // 1K literals
} if cap(b.literals) < 1<<10 {
const defSeqs = 200 b.literals = make([]byte, 0, 1<<10)
b.literals = b.literals[:0] }
if cap(b.sequences) < defSeqs { const defSeqs = 20
b.sequences = make([]seq, 0, defSeqs) if cap(b.sequences) < defSeqs {
} b.sequences = make([]seq, 0, defSeqs)
if cap(b.output) < maxCompressedBlockSize { }
b.output = make([]byte, 0, maxCompressedBlockSize) // 1K
if cap(b.output) < 1<<10 {
b.output = make([]byte, 0, 1<<10)
}
} else {
if cap(b.literals) < maxCompressedBlockSize {
b.literals = make([]byte, 0, maxCompressedBlockSize)
}
const defSeqs = 200
if cap(b.sequences) < defSeqs {
b.sequences = make([]seq, 0, defSeqs)
}
if cap(b.output) < maxCompressedBlockSize {
b.output = make([]byte, 0, maxCompressedBlockSize)
}
} }
if b.coders.mlEnc == nil { if b.coders.mlEnc == nil {
b.coders.mlEnc = &fseEncoder{} b.coders.mlEnc = &fseEncoder{}
b.coders.mlPrev = &fseEncoder{} b.coders.mlPrev = &fseEncoder{}

View file

@ -7,6 +7,10 @@ import (
"github.com/klauspost/compress/zstd/internal/xxhash" "github.com/klauspost/compress/zstd/internal/xxhash"
) )
const (
dictShardBits = 6
)
type fastBase struct { type fastBase struct {
// cur is the offset at the start of hist // cur is the offset at the start of hist
cur int32 cur int32
@ -17,6 +21,7 @@ type fastBase struct {
tmp [8]byte tmp [8]byte
blk *blockEnc blk *blockEnc
lastDictID uint32 lastDictID uint32
lowMem bool
} }
// CRC returns the underlying CRC writer. // CRC returns the underlying CRC writer.
@ -57,15 +62,10 @@ func (e *fastBase) addBlock(src []byte) int32 {
// check if we have space already // check if we have space already
if len(e.hist)+len(src) > cap(e.hist) { if len(e.hist)+len(src) > cap(e.hist) {
if cap(e.hist) == 0 { if cap(e.hist) == 0 {
l := e.maxMatchOff * 2 e.ensureHist(len(src))
// Make it at least 1MB.
if l < 1<<20 {
l = 1 << 20
}
e.hist = make([]byte, 0, l)
} else { } else {
if cap(e.hist) < int(e.maxMatchOff*2) { if cap(e.hist) < int(e.maxMatchOff+maxCompressedBlockSize) {
panic("unexpected buffer size") panic(fmt.Errorf("unexpected buffer cap %d, want at least %d with window %d", cap(e.hist), e.maxMatchOff+maxCompressedBlockSize, e.maxMatchOff))
} }
// Move down // Move down
offset := int32(len(e.hist)) - e.maxMatchOff offset := int32(len(e.hist)) - e.maxMatchOff
@ -79,6 +79,28 @@ func (e *fastBase) addBlock(src []byte) int32 {
return s return s
} }
// ensureHist will ensure that history can keep at least this many bytes.
func (e *fastBase) ensureHist(n int) {
if cap(e.hist) >= n {
return
}
l := e.maxMatchOff
if (e.lowMem && e.maxMatchOff > maxCompressedBlockSize) || e.maxMatchOff <= maxCompressedBlockSize {
l += maxCompressedBlockSize
} else {
l += e.maxMatchOff
}
// Make it at least 1MB.
if l < 1<<20 && !e.lowMem {
l = 1 << 20
}
// Make it at least the requested size.
if l < int32(n) {
l = int32(n)
}
e.hist = make([]byte, 0, l)
}
// useBlock will replace the block with the provided one, // useBlock will replace the block with the provided one,
// but transfer recent offsets from the previous. // but transfer recent offsets from the previous.
func (e *fastBase) UseBlock(enc *blockEnc) { func (e *fastBase) UseBlock(enc *blockEnc) {
@ -117,7 +139,7 @@ func (e *fastBase) matchlen(s, t int32, src []byte) int32 {
// Reset the encoding table. // Reset the encoding table.
func (e *fastBase) resetBase(d *dict, singleBlock bool) { func (e *fastBase) resetBase(d *dict, singleBlock bool) {
if e.blk == nil { if e.blk == nil {
e.blk = &blockEnc{} e.blk = &blockEnc{lowMem: e.lowMem}
e.blk.init() e.blk.init()
} else { } else {
e.blk.reset(nil) e.blk.reset(nil)

View file

@ -407,6 +407,7 @@ encodeLoop:
// Most notable difference is that src will not be copied for history and // Most notable difference is that src will not be copied for history and
// we do not need to check for max match length. // we do not need to check for max match length.
func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) {
e.ensureHist(len(src))
e.Encode(blk, src) e.Encode(blk, src)
} }

View file

@ -16,6 +16,12 @@ const (
// This greatly depends on the type of input. // This greatly depends on the type of input.
betterShortTableBits = 13 // Bits used in the short match table betterShortTableBits = 13 // Bits used in the short match table
betterShortTableSize = 1 << betterShortTableBits // Size of the table betterShortTableSize = 1 << betterShortTableBits // Size of the table
betterLongTableShardCnt = 1 << (betterLongTableBits - dictShardBits) // Number of shards in the table
betterLongTableShardSize = betterLongTableSize / betterLongTableShardCnt // Size of an individual shard
betterShortTableShardCnt = 1 << (betterShortTableBits - dictShardBits) // Number of shards in the table
betterShortTableShardSize = betterShortTableSize / betterShortTableShardCnt // Size of an individual shard
) )
type prevEntry struct { type prevEntry struct {
@ -31,10 +37,17 @@ type prevEntry struct {
// and that it is longer (lazy matching). // and that it is longer (lazy matching).
type betterFastEncoder struct { type betterFastEncoder struct {
fastBase fastBase
table [betterShortTableSize]tableEntry table [betterShortTableSize]tableEntry
longTable [betterLongTableSize]prevEntry longTable [betterLongTableSize]prevEntry
dictTable []tableEntry }
dictLongTable []prevEntry
type betterFastEncoderDict struct {
betterFastEncoder
dictTable []tableEntry
dictLongTable []prevEntry
shortTableShardDirty [betterShortTableShardCnt]bool
longTableShardDirty [betterLongTableShardCnt]bool
allDirty bool
} }
// Encode improves compression... // Encode improves compression...
@ -516,11 +529,511 @@ encodeLoop:
// Most notable difference is that src will not be copied for history and // Most notable difference is that src will not be copied for history and
// we do not need to check for max match length. // we do not need to check for max match length.
func (e *betterFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { func (e *betterFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) {
e.ensureHist(len(src))
e.Encode(blk, src) e.Encode(blk, src)
} }
// Encode improves compression...
func (e *betterFastEncoderDict) Encode(blk *blockEnc, src []byte) {
const (
// Input margin is the number of bytes we read (8)
// and the maximum we will read ahead (2)
inputMargin = 8 + 2
minNonLiteralBlockSize = 16
)
// Protect against e.cur wraparound.
for e.cur >= bufferReset {
if len(e.hist) == 0 {
for i := range e.table[:] {
e.table[i] = tableEntry{}
}
for i := range e.longTable[:] {
e.longTable[i] = prevEntry{}
}
e.cur = e.maxMatchOff
e.allDirty = true
break
}
// Shift down everything in the table that isn't already too far away.
minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff
for i := range e.table[:] {
v := e.table[i].offset
if v < minOff {
v = 0
} else {
v = v - e.cur + e.maxMatchOff
}
e.table[i].offset = v
}
for i := range e.longTable[:] {
v := e.longTable[i].offset
v2 := e.longTable[i].prev
if v < minOff {
v = 0
v2 = 0
} else {
v = v - e.cur + e.maxMatchOff
if v2 < minOff {
v2 = 0
} else {
v2 = v2 - e.cur + e.maxMatchOff
}
}
e.longTable[i] = prevEntry{
offset: v,
prev: v2,
}
}
e.allDirty = true
e.cur = e.maxMatchOff
break
}
s := e.addBlock(src)
blk.size = len(src)
if len(src) < minNonLiteralBlockSize {
blk.extraLits = len(src)
blk.literals = blk.literals[:len(src)]
copy(blk.literals, src)
return
}
// Override src
src = e.hist
sLimit := int32(len(src)) - inputMargin
// stepSize is the number of bytes to skip on every main loop iteration.
// It should be >= 1.
const stepSize = 1
const kSearchStrength = 9
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := s
cv := load6432(src, s)
// Relative offsets
offset1 := int32(blk.recentOffsets[0])
offset2 := int32(blk.recentOffsets[1])
addLiterals := func(s *seq, until int32) {
if until == nextEmit {
return
}
blk.literals = append(blk.literals, src[nextEmit:until]...)
s.litLen = uint32(until - nextEmit)
}
if debug {
println("recent offsets:", blk.recentOffsets)
}
encodeLoop:
for {
var t int32
// We allow the encoder to optionally turn off repeat offsets across blocks
canRepeat := len(blk.sequences) > 2
var matched int32
for {
if debugAsserts && canRepeat && offset1 == 0 {
panic("offset0 was 0")
}
nextHashS := hash5(cv, betterShortTableBits)
nextHashL := hash8(cv, betterLongTableBits)
candidateL := e.longTable[nextHashL]
candidateS := e.table[nextHashS]
const repOff = 1
repIndex := s - offset1 + repOff
off := s + e.cur
e.longTable[nextHashL] = prevEntry{offset: off, prev: candidateL.offset}
e.markLongShardDirty(nextHashL)
e.table[nextHashS] = tableEntry{offset: off, val: uint32(cv)}
e.markShortShardDirty(nextHashS)
if canRepeat {
if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) {
// Consider history as well.
var seq seq
lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src)
seq.matchLen = uint32(lenght - zstdMinMatch)
// We might be able to match backwards.
// Extend as long as we can.
start := s + repOff
// We end the search early, so we don't risk 0 literals
// and have to do special offset treatment.
startLimit := nextEmit + 1
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 {
repIndex--
start--
seq.matchLen++
}
addLiterals(&seq, start)
// rep 0
seq.offset = 1
if debugSequences {
println("repeat sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
// Index match start+1 (long) -> s - 1
index0 := s + repOff
s += lenght + repOff
nextEmit = s
if s >= sLimit {
if debug {
println("repeat ended", s, lenght)
}
break encodeLoop
}
// Index skipped...
for index0 < s-1 {
cv0 := load6432(src, index0)
cv1 := cv0 >> 8
h0 := hash8(cv0, betterLongTableBits)
off := index0 + e.cur
e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset}
e.markLongShardDirty(h0)
h1 := hash5(cv1, betterShortTableBits)
e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)}
e.markShortShardDirty(h1)
index0 += 2
}
cv = load6432(src, s)
continue
}
const repOff2 = 1
// We deviate from the reference encoder and also check offset 2.
// Still slower and not much better, so disabled.
// repIndex = s - offset2 + repOff2
if false && repIndex >= 0 && load6432(src, repIndex) == load6432(src, s+repOff) {
// Consider history as well.
var seq seq
lenght := 8 + e.matchlen(s+8+repOff2, repIndex+8, src)
seq.matchLen = uint32(lenght - zstdMinMatch)
// We might be able to match backwards.
// Extend as long as we can.
start := s + repOff2
// We end the search early, so we don't risk 0 literals
// and have to do special offset treatment.
startLimit := nextEmit + 1
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 {
repIndex--
start--
seq.matchLen++
}
addLiterals(&seq, start)
// rep 2
seq.offset = 2
if debugSequences {
println("repeat sequence 2", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
index0 := s + repOff2
s += lenght + repOff2
nextEmit = s
if s >= sLimit {
if debug {
println("repeat ended", s, lenght)
}
break encodeLoop
}
// Index skipped...
for index0 < s-1 {
cv0 := load6432(src, index0)
cv1 := cv0 >> 8
h0 := hash8(cv0, betterLongTableBits)
off := index0 + e.cur
e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset}
e.markLongShardDirty(h0)
h1 := hash5(cv1, betterShortTableBits)
e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)}
e.markShortShardDirty(h1)
index0 += 2
}
cv = load6432(src, s)
// Swap offsets
offset1, offset2 = offset2, offset1
continue
}
}
// Find the offsets of our two matches.
coffsetL := candidateL.offset - e.cur
coffsetLP := candidateL.prev - e.cur
// Check if we have a long match.
if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) {
// Found a long match, at least 8 bytes.
matched = e.matchlen(s+8, coffsetL+8, src) + 8
t = coffsetL
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugMatches {
println("long match")
}
if s-coffsetLP < e.maxMatchOff && cv == load6432(src, coffsetLP) {
// Found a long match, at least 8 bytes.
prevMatch := e.matchlen(s+8, coffsetLP+8, src) + 8
if prevMatch > matched {
matched = prevMatch
t = coffsetLP
}
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugMatches {
println("long match")
}
}
break
}
// Check if we have a long match on prev.
if s-coffsetLP < e.maxMatchOff && cv == load6432(src, coffsetLP) {
// Found a long match, at least 8 bytes.
matched = e.matchlen(s+8, coffsetLP+8, src) + 8
t = coffsetLP
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugMatches {
println("long match")
}
break
}
coffsetS := candidateS.offset - e.cur
// Check if we have a short match.
if s-coffsetS < e.maxMatchOff && uint32(cv) == candidateS.val {
// found a regular match
matched = e.matchlen(s+4, coffsetS+4, src) + 4
// See if we can find a long match at s+1
const checkAt = 1
cv := load6432(src, s+checkAt)
nextHashL = hash8(cv, betterLongTableBits)
candidateL = e.longTable[nextHashL]
coffsetL = candidateL.offset - e.cur
// We can store it, since we have at least a 4 byte match.
e.longTable[nextHashL] = prevEntry{offset: s + checkAt + e.cur, prev: candidateL.offset}
e.markLongShardDirty(nextHashL)
if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) {
// Found a long match, at least 8 bytes.
matchedNext := e.matchlen(s+8+checkAt, coffsetL+8, src) + 8
if matchedNext > matched {
t = coffsetL
s += checkAt
matched = matchedNext
if debugMatches {
println("long match (after short)")
}
break
}
}
// Check prev long...
coffsetL = candidateL.prev - e.cur
if s-coffsetL < e.maxMatchOff && cv == load6432(src, coffsetL) {
// Found a long match, at least 8 bytes.
matchedNext := e.matchlen(s+8+checkAt, coffsetL+8, src) + 8
if matchedNext > matched {
t = coffsetL
s += checkAt
matched = matchedNext
if debugMatches {
println("prev long match (after short)")
}
break
}
}
t = coffsetS
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugAsserts && t < 0 {
panic("t<0")
}
if debugMatches {
println("short match")
}
break
}
// No match found, move forward in input.
s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1))
if s >= sLimit {
break encodeLoop
}
cv = load6432(src, s)
}
// A 4-byte match has been found. Update recent offsets.
// We'll later see if more than 4 bytes.
offset2 = offset1
offset1 = s - t
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && canRepeat && int(offset1) > len(src) {
panic("invalid offset")
}
// Extend the n-byte match as long as possible.
l := matched
// Extend backwards
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength {
s--
t--
l++
}
// Write our sequence
var seq seq
seq.litLen = uint32(s - nextEmit)
seq.matchLen = uint32(l - zstdMinMatch)
if seq.litLen > 0 {
blk.literals = append(blk.literals, src[nextEmit:s]...)
}
seq.offset = uint32(s-t) + 3
s += l
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
nextEmit = s
if s >= sLimit {
break encodeLoop
}
// Index match start+1 (long) -> s - 1
index0 := s - l + 1
for index0 < s-1 {
cv0 := load6432(src, index0)
cv1 := cv0 >> 8
h0 := hash8(cv0, betterLongTableBits)
off := index0 + e.cur
e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset}
e.markLongShardDirty(h0)
h1 := hash5(cv1, betterShortTableBits)
e.table[h1] = tableEntry{offset: off + 1, val: uint32(cv1)}
e.markShortShardDirty(h1)
index0 += 2
}
cv = load6432(src, s)
if !canRepeat {
continue
}
// Check offset 2
for {
o2 := s - offset2
if load3232(src, o2) != uint32(cv) {
// Do regular search
break
}
// Store this, since we have it.
nextHashS := hash5(cv, betterShortTableBits)
nextHashL := hash8(cv, betterLongTableBits)
// We have at least 4 byte match.
// No need to check backwards. We come straight from a match
l := 4 + e.matchlen(s+4, o2+4, src)
e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: e.longTable[nextHashL].offset}
e.markLongShardDirty(nextHashL)
e.table[nextHashS] = tableEntry{offset: s + e.cur, val: uint32(cv)}
e.markShortShardDirty(nextHashS)
seq.matchLen = uint32(l) - zstdMinMatch
seq.litLen = 0
// Since litlen is always 0, this is offset 1.
seq.offset = 1
s += l
nextEmit = s
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
// Swap offset 1 and 2.
offset1, offset2 = offset2, offset1
if s >= sLimit {
// Finished
break encodeLoop
}
cv = load6432(src, s)
}
}
if int(nextEmit) < len(src) {
blk.literals = append(blk.literals, src[nextEmit:]...)
blk.extraLits = len(src) - int(nextEmit)
}
blk.recentOffsets[0] = uint32(offset1)
blk.recentOffsets[1] = uint32(offset2)
if debug {
println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits)
}
}
// ResetDict will reset and set a dictionary if not nil // ResetDict will reset and set a dictionary if not nil
func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) { func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) {
e.resetBase(d, singleBlock)
if d != nil {
panic("betterFastEncoder: Reset with dict")
}
}
// ResetDict will reset and set a dictionary if not nil
func (e *betterFastEncoderDict) Reset(d *dict, singleBlock bool) {
e.resetBase(d, singleBlock) e.resetBase(d, singleBlock)
if d == nil { if d == nil {
return return
@ -557,6 +1070,7 @@ func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) {
} }
} }
e.lastDictID = d.id e.lastDictID = d.id
e.allDirty = true
} }
// Init or copy dict table // Init or copy dict table
@ -585,11 +1099,72 @@ func (e *betterFastEncoder) Reset(d *dict, singleBlock bool) {
} }
} }
e.lastDictID = d.id e.lastDictID = d.id
e.allDirty = true
} }
// Reset table to initial state
copy(e.longTable[:], e.dictLongTable)
e.cur = e.maxMatchOff
// Reset table to initial state // Reset table to initial state
copy(e.table[:], e.dictTable) {
dirtyShardCnt := 0
if !e.allDirty {
for i := range e.shortTableShardDirty {
if e.shortTableShardDirty[i] {
dirtyShardCnt++
}
}
}
const shardCnt = betterShortTableShardCnt
const shardSize = betterShortTableShardSize
if e.allDirty || dirtyShardCnt > shardCnt*4/6 {
copy(e.table[:], e.dictTable)
for i := range e.shortTableShardDirty {
e.shortTableShardDirty[i] = false
}
} else {
for i := range e.shortTableShardDirty {
if !e.shortTableShardDirty[i] {
continue
}
copy(e.table[i*shardSize:(i+1)*shardSize], e.dictTable[i*shardSize:(i+1)*shardSize])
e.shortTableShardDirty[i] = false
}
}
}
{
dirtyShardCnt := 0
if !e.allDirty {
for i := range e.shortTableShardDirty {
if e.shortTableShardDirty[i] {
dirtyShardCnt++
}
}
}
const shardCnt = betterLongTableShardCnt
const shardSize = betterLongTableShardSize
if e.allDirty || dirtyShardCnt > shardCnt*4/6 {
copy(e.longTable[:], e.dictLongTable)
for i := range e.longTableShardDirty {
e.longTableShardDirty[i] = false
}
} else {
for i := range e.longTableShardDirty {
if !e.longTableShardDirty[i] {
continue
}
copy(e.longTable[i*shardSize:(i+1)*shardSize], e.dictLongTable[i*shardSize:(i+1)*shardSize])
e.longTableShardDirty[i] = false
}
}
}
e.cur = e.maxMatchOff
e.allDirty = false
}
func (e *betterFastEncoderDict) markLongShardDirty(entryNum uint32) {
e.longTableShardDirty[entryNum/betterLongTableShardSize] = true
}
func (e *betterFastEncoderDict) markShortShardDirty(entryNum uint32) {
e.shortTableShardDirty[entryNum/betterShortTableShardSize] = true
} }

View file

@ -11,6 +11,9 @@ const (
dFastLongTableSize = 1 << dFastLongTableBits // Size of the table dFastLongTableSize = 1 << dFastLongTableBits // Size of the table
dFastLongTableMask = dFastLongTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. dFastLongTableMask = dFastLongTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks.
dLongTableShardCnt = 1 << (dFastLongTableBits - dictShardBits) // Number of shards in the table
dLongTableShardSize = dFastLongTableSize / tableShardCnt // Size of an individual shard
dFastShortTableBits = tableBits // Bits used in the short match table dFastShortTableBits = tableBits // Bits used in the short match table
dFastShortTableSize = 1 << dFastShortTableBits // Size of the table dFastShortTableSize = 1 << dFastShortTableBits // Size of the table
dFastShortTableMask = dFastShortTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. dFastShortTableMask = dFastShortTableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks.
@ -18,8 +21,14 @@ const (
type doubleFastEncoder struct { type doubleFastEncoder struct {
fastEncoder fastEncoder
longTable [dFastLongTableSize]tableEntry longTable [dFastLongTableSize]tableEntry
dictLongTable []tableEntry }
type doubleFastEncoderDict struct {
fastEncoderDict
longTable [dFastLongTableSize]tableEntry
dictLongTable []tableEntry
longTableShardDirty [dLongTableShardCnt]bool
} }
// Encode mimmics functionality in zstd_dfast.c // Encode mimmics functionality in zstd_dfast.c
@ -678,9 +687,379 @@ encodeLoop:
} }
} }
// Encode will encode the content, with a dictionary if initialized for it.
func (e *doubleFastEncoderDict) Encode(blk *blockEnc, src []byte) {
const (
// Input margin is the number of bytes we read (8)
// and the maximum we will read ahead (2)
inputMargin = 8 + 2
minNonLiteralBlockSize = 16
)
// Protect against e.cur wraparound.
for e.cur >= bufferReset {
if len(e.hist) == 0 {
for i := range e.table[:] {
e.table[i] = tableEntry{}
}
for i := range e.longTable[:] {
e.longTable[i] = tableEntry{}
}
e.markAllShardsDirty()
e.cur = e.maxMatchOff
break
}
// Shift down everything in the table that isn't already too far away.
minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff
for i := range e.table[:] {
v := e.table[i].offset
if v < minOff {
v = 0
} else {
v = v - e.cur + e.maxMatchOff
}
e.table[i].offset = v
}
for i := range e.longTable[:] {
v := e.longTable[i].offset
if v < minOff {
v = 0
} else {
v = v - e.cur + e.maxMatchOff
}
e.longTable[i].offset = v
}
e.markAllShardsDirty()
e.cur = e.maxMatchOff
break
}
s := e.addBlock(src)
blk.size = len(src)
if len(src) < minNonLiteralBlockSize {
blk.extraLits = len(src)
blk.literals = blk.literals[:len(src)]
copy(blk.literals, src)
return
}
// Override src
src = e.hist
sLimit := int32(len(src)) - inputMargin
// stepSize is the number of bytes to skip on every main loop iteration.
// It should be >= 1.
const stepSize = 1
const kSearchStrength = 8
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := s
cv := load6432(src, s)
// Relative offsets
offset1 := int32(blk.recentOffsets[0])
offset2 := int32(blk.recentOffsets[1])
addLiterals := func(s *seq, until int32) {
if until == nextEmit {
return
}
blk.literals = append(blk.literals, src[nextEmit:until]...)
s.litLen = uint32(until - nextEmit)
}
if debug {
println("recent offsets:", blk.recentOffsets)
}
encodeLoop:
for {
var t int32
// We allow the encoder to optionally turn off repeat offsets across blocks
canRepeat := len(blk.sequences) > 2
for {
if debugAsserts && canRepeat && offset1 == 0 {
panic("offset0 was 0")
}
nextHashS := hash5(cv, dFastShortTableBits)
nextHashL := hash8(cv, dFastLongTableBits)
candidateL := e.longTable[nextHashL]
candidateS := e.table[nextHashS]
const repOff = 1
repIndex := s - offset1 + repOff
entry := tableEntry{offset: s + e.cur, val: uint32(cv)}
e.longTable[nextHashL] = entry
e.markLongShardDirty(nextHashL)
e.table[nextHashS] = entry
e.markShardDirty(nextHashS)
if canRepeat {
if repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>(repOff*8)) {
// Consider history as well.
var seq seq
lenght := 4 + e.matchlen(s+4+repOff, repIndex+4, src)
seq.matchLen = uint32(lenght - zstdMinMatch)
// We might be able to match backwards.
// Extend as long as we can.
start := s + repOff
// We end the search early, so we don't risk 0 literals
// and have to do special offset treatment.
startLimit := nextEmit + 1
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 {
repIndex--
start--
seq.matchLen++
}
addLiterals(&seq, start)
// rep 0
seq.offset = 1
if debugSequences {
println("repeat sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
s += lenght + repOff
nextEmit = s
if s >= sLimit {
if debug {
println("repeat ended", s, lenght)
}
break encodeLoop
}
cv = load6432(src, s)
continue
}
}
// Find the offsets of our two matches.
coffsetL := s - (candidateL.offset - e.cur)
coffsetS := s - (candidateS.offset - e.cur)
// Check if we have a long match.
if coffsetL < e.maxMatchOff && uint32(cv) == candidateL.val {
// Found a long match, likely at least 8 bytes.
// Reference encoder checks all 8 bytes, we only check 4,
// but the likelihood of both the first 4 bytes and the hash matching should be enough.
t = candidateL.offset - e.cur
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugMatches {
println("long match")
}
break
}
// Check if we have a short match.
if coffsetS < e.maxMatchOff && uint32(cv) == candidateS.val {
// found a regular match
// See if we can find a long match at s+1
const checkAt = 1
cv := load6432(src, s+checkAt)
nextHashL = hash8(cv, dFastLongTableBits)
candidateL = e.longTable[nextHashL]
coffsetL = s - (candidateL.offset - e.cur) + checkAt
// We can store it, since we have at least a 4 byte match.
e.longTable[nextHashL] = tableEntry{offset: s + checkAt + e.cur, val: uint32(cv)}
e.markLongShardDirty(nextHashL)
if coffsetL < e.maxMatchOff && uint32(cv) == candidateL.val {
// Found a long match, likely at least 8 bytes.
// Reference encoder checks all 8 bytes, we only check 4,
// but the likelihood of both the first 4 bytes and the hash matching should be enough.
t = candidateL.offset - e.cur
s += checkAt
if debugMatches {
println("long match (after short)")
}
break
}
t = candidateS.offset - e.cur
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugAsserts && t < 0 {
panic("t<0")
}
if debugMatches {
println("short match")
}
break
}
// No match found, move forward in input.
s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1))
if s >= sLimit {
break encodeLoop
}
cv = load6432(src, s)
}
// A 4-byte match has been found. Update recent offsets.
// We'll later see if more than 4 bytes.
offset2 = offset1
offset1 = s - t
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && canRepeat && int(offset1) > len(src) {
panic("invalid offset")
}
// Extend the 4-byte match as long as possible.
l := e.matchlen(s+4, t+4, src) + 4
// Extend backwards
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength {
s--
t--
l++
}
// Write our sequence
var seq seq
seq.litLen = uint32(s - nextEmit)
seq.matchLen = uint32(l - zstdMinMatch)
if seq.litLen > 0 {
blk.literals = append(blk.literals, src[nextEmit:s]...)
}
seq.offset = uint32(s-t) + 3
s += l
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
nextEmit = s
if s >= sLimit {
break encodeLoop
}
// Index match start+1 (long) and start+2 (short)
index0 := s - l + 1
// Index match end-2 (long) and end-1 (short)
index1 := s - 2
cv0 := load6432(src, index0)
cv1 := load6432(src, index1)
te0 := tableEntry{offset: index0 + e.cur, val: uint32(cv0)}
te1 := tableEntry{offset: index1 + e.cur, val: uint32(cv1)}
longHash1 := hash8(cv0, dFastLongTableBits)
longHash2 := hash8(cv0, dFastLongTableBits)
e.longTable[longHash1] = te0
e.longTable[longHash2] = te1
e.markLongShardDirty(longHash1)
e.markLongShardDirty(longHash2)
cv0 >>= 8
cv1 >>= 8
te0.offset++
te1.offset++
te0.val = uint32(cv0)
te1.val = uint32(cv1)
hashVal1 := hash5(cv0, dFastShortTableBits)
hashVal2 := hash5(cv1, dFastShortTableBits)
e.table[hashVal1] = te0
e.markShardDirty(hashVal1)
e.table[hashVal2] = te1
e.markShardDirty(hashVal2)
cv = load6432(src, s)
if !canRepeat {
continue
}
// Check offset 2
for {
o2 := s - offset2
if load3232(src, o2) != uint32(cv) {
// Do regular search
break
}
// Store this, since we have it.
nextHashS := hash5(cv, dFastShortTableBits)
nextHashL := hash8(cv, dFastLongTableBits)
// We have at least 4 byte match.
// No need to check backwards. We come straight from a match
l := 4 + e.matchlen(s+4, o2+4, src)
entry := tableEntry{offset: s + e.cur, val: uint32(cv)}
e.longTable[nextHashL] = entry
e.markLongShardDirty(nextHashL)
e.table[nextHashS] = entry
e.markShardDirty(nextHashS)
seq.matchLen = uint32(l) - zstdMinMatch
seq.litLen = 0
// Since litlen is always 0, this is offset 1.
seq.offset = 1
s += l
nextEmit = s
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
// Swap offset 1 and 2.
offset1, offset2 = offset2, offset1
if s >= sLimit {
// Finished
break encodeLoop
}
cv = load6432(src, s)
}
}
if int(nextEmit) < len(src) {
blk.literals = append(blk.literals, src[nextEmit:]...)
blk.extraLits = len(src) - int(nextEmit)
}
blk.recentOffsets[0] = uint32(offset1)
blk.recentOffsets[1] = uint32(offset2)
if debug {
println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits)
}
// If we encoded more than 64K mark all dirty.
if len(src) > 64<<10 {
e.markAllShardsDirty()
}
}
// ResetDict will reset and set a dictionary if not nil // ResetDict will reset and set a dictionary if not nil
func (e *doubleFastEncoder) Reset(d *dict, singleBlock bool) { func (e *doubleFastEncoder) Reset(d *dict, singleBlock bool) {
e.fastEncoder.Reset(d, singleBlock) e.fastEncoder.Reset(d, singleBlock)
if d != nil {
panic("doubleFastEncoder: Reset with dict not supported")
}
}
// ResetDict will reset and set a dictionary if not nil
func (e *doubleFastEncoderDict) Reset(d *dict, singleBlock bool) {
allDirty := e.allDirty
e.fastEncoderDict.Reset(d, singleBlock)
if d == nil { if d == nil {
return return
} }
@ -706,8 +1085,37 @@ func (e *doubleFastEncoder) Reset(d *dict, singleBlock bool) {
} }
} }
e.lastDictID = d.id e.lastDictID = d.id
e.allDirty = true
} }
// Reset table to initial state // Reset table to initial state
e.cur = e.maxMatchOff e.cur = e.maxMatchOff
copy(e.longTable[:], e.dictLongTable)
dirtyShardCnt := 0
if !allDirty {
for i := range e.longTableShardDirty {
if e.longTableShardDirty[i] {
dirtyShardCnt++
}
}
}
if allDirty || dirtyShardCnt > dLongTableShardCnt/2 {
copy(e.longTable[:], e.dictLongTable)
for i := range e.longTableShardDirty {
e.longTableShardDirty[i] = false
}
return
}
for i := range e.longTableShardDirty {
if !e.longTableShardDirty[i] {
continue
}
copy(e.longTable[i*dLongTableShardSize:(i+1)*dLongTableShardSize], e.dictLongTable[i*dLongTableShardSize:(i+1)*dLongTableShardSize])
e.longTableShardDirty[i] = false
}
}
func (e *doubleFastEncoderDict) markLongShardDirty(entryNum uint32) {
e.longTableShardDirty[entryNum/dLongTableShardSize] = true
} }

View file

@ -11,9 +11,11 @@ import (
) )
const ( const (
tableBits = 15 // Bits used in the table tableBits = 15 // Bits used in the table
tableSize = 1 << tableBits // Size of the table tableSize = 1 << tableBits // Size of the table
tableMask = tableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks. tableShardCnt = 1 << (tableBits - dictShardBits) // Number of shards in the table
tableShardSize = tableSize / tableShardCnt // Size of an individual shard
tableMask = tableSize - 1 // Mask for table indices. Redundant, but can eliminate bounds checks.
maxMatchLength = 131074 maxMatchLength = 131074
) )
@ -24,8 +26,14 @@ type tableEntry struct {
type fastEncoder struct { type fastEncoder struct {
fastBase fastBase
table [tableSize]tableEntry table [tableSize]tableEntry
dictTable []tableEntry }
type fastEncoderDict struct {
fastEncoder
dictTable []tableEntry
tableShardDirty [tableShardCnt]bool
allDirty bool
} }
// Encode mimmics functionality in zstd_fast.c // Encode mimmics functionality in zstd_fast.c
@ -617,8 +625,322 @@ encodeLoop:
} }
} }
// Encode will encode the content, with a dictionary if initialized for it.
func (e *fastEncoderDict) Encode(blk *blockEnc, src []byte) {
const (
inputMargin = 8
minNonLiteralBlockSize = 1 + 1 + inputMargin
)
if e.allDirty || len(src) > 32<<10 {
e.fastEncoder.Encode(blk, src)
e.allDirty = true
return
}
// Protect against e.cur wraparound.
for e.cur >= bufferReset {
if len(e.hist) == 0 {
for i := range e.table[:] {
e.table[i] = tableEntry{}
}
e.cur = e.maxMatchOff
break
}
// Shift down everything in the table that isn't already too far away.
minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff
for i := range e.table[:] {
v := e.table[i].offset
if v < minOff {
v = 0
} else {
v = v - e.cur + e.maxMatchOff
}
e.table[i].offset = v
}
e.cur = e.maxMatchOff
break
}
s := e.addBlock(src)
blk.size = len(src)
if len(src) < minNonLiteralBlockSize {
blk.extraLits = len(src)
blk.literals = blk.literals[:len(src)]
copy(blk.literals, src)
return
}
// Override src
src = e.hist
sLimit := int32(len(src)) - inputMargin
// stepSize is the number of bytes to skip on every main loop iteration.
// It should be >= 2.
const stepSize = 2
// TEMPLATE
const hashLog = tableBits
// seems global, but would be nice to tweak.
const kSearchStrength = 7
// nextEmit is where in src the next emitLiteral should start from.
nextEmit := s
cv := load6432(src, s)
// Relative offsets
offset1 := int32(blk.recentOffsets[0])
offset2 := int32(blk.recentOffsets[1])
addLiterals := func(s *seq, until int32) {
if until == nextEmit {
return
}
blk.literals = append(blk.literals, src[nextEmit:until]...)
s.litLen = uint32(until - nextEmit)
}
if debug {
println("recent offsets:", blk.recentOffsets)
}
encodeLoop:
for {
// t will contain the match offset when we find one.
// When existing the search loop, we have already checked 4 bytes.
var t int32
// We will not use repeat offsets across blocks.
// By not using them for the first 3 matches
canRepeat := len(blk.sequences) > 2
for {
if debugAsserts && canRepeat && offset1 == 0 {
panic("offset0 was 0")
}
nextHash := hash6(cv, hashLog)
nextHash2 := hash6(cv>>8, hashLog)
candidate := e.table[nextHash]
candidate2 := e.table[nextHash2]
repIndex := s - offset1 + 2
e.table[nextHash] = tableEntry{offset: s + e.cur, val: uint32(cv)}
e.markShardDirty(nextHash)
e.table[nextHash2] = tableEntry{offset: s + e.cur + 1, val: uint32(cv >> 8)}
e.markShardDirty(nextHash2)
if canRepeat && repIndex >= 0 && load3232(src, repIndex) == uint32(cv>>16) {
// Consider history as well.
var seq seq
var length int32
// length = 4 + e.matchlen(s+6, repIndex+4, src)
{
a := src[s+6:]
b := src[repIndex+4:]
endI := len(a) & (math.MaxInt32 - 7)
length = int32(endI) + 4
for i := 0; i < endI; i += 8 {
if diff := load64(a, i) ^ load64(b, i); diff != 0 {
length = int32(i+bits.TrailingZeros64(diff)>>3) + 4
break
}
}
}
seq.matchLen = uint32(length - zstdMinMatch)
// We might be able to match backwards.
// Extend as long as we can.
start := s + 2
// We end the search early, so we don't risk 0 literals
// and have to do special offset treatment.
startLimit := nextEmit + 1
sMin := s - e.maxMatchOff
if sMin < 0 {
sMin = 0
}
for repIndex > sMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch {
repIndex--
start--
seq.matchLen++
}
addLiterals(&seq, start)
// rep 0
seq.offset = 1
if debugSequences {
println("repeat sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
s += length + 2
nextEmit = s
if s >= sLimit {
if debug {
println("repeat ended", s, length)
}
break encodeLoop
}
cv = load6432(src, s)
continue
}
coffset0 := s - (candidate.offset - e.cur)
coffset1 := s - (candidate2.offset - e.cur) + 1
if coffset0 < e.maxMatchOff && uint32(cv) == candidate.val {
// found a regular match
t = candidate.offset - e.cur
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
break
}
if coffset1 < e.maxMatchOff && uint32(cv>>8) == candidate2.val {
// found a regular match
t = candidate2.offset - e.cur
s++
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && s-t > e.maxMatchOff {
panic("s - t >e.maxMatchOff")
}
if debugAsserts && t < 0 {
panic("t<0")
}
break
}
s += stepSize + ((s - nextEmit) >> (kSearchStrength - 1))
if s >= sLimit {
break encodeLoop
}
cv = load6432(src, s)
}
// A 4-byte match has been found. We'll later see if more than 4 bytes.
offset2 = offset1
offset1 = s - t
if debugAsserts && s <= t {
panic(fmt.Sprintf("s (%d) <= t (%d)", s, t))
}
if debugAsserts && canRepeat && int(offset1) > len(src) {
panic("invalid offset")
}
// Extend the 4-byte match as long as possible.
//l := e.matchlen(s+4, t+4, src) + 4
var l int32
{
a := src[s+4:]
b := src[t+4:]
endI := len(a) & (math.MaxInt32 - 7)
l = int32(endI) + 4
for i := 0; i < endI; i += 8 {
if diff := load64(a, i) ^ load64(b, i); diff != 0 {
l = int32(i+bits.TrailingZeros64(diff)>>3) + 4
break
}
}
}
// Extend backwards
tMin := s - e.maxMatchOff
if tMin < 0 {
tMin = 0
}
for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength {
s--
t--
l++
}
// Write our sequence.
var seq seq
seq.litLen = uint32(s - nextEmit)
seq.matchLen = uint32(l - zstdMinMatch)
if seq.litLen > 0 {
blk.literals = append(blk.literals, src[nextEmit:s]...)
}
// Don't use repeat offsets
seq.offset = uint32(s-t) + 3
s += l
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
nextEmit = s
if s >= sLimit {
break encodeLoop
}
cv = load6432(src, s)
// Check offset 2
if o2 := s - offset2; canRepeat && load3232(src, o2) == uint32(cv) {
// We have at least 4 byte match.
// No need to check backwards. We come straight from a match
//l := 4 + e.matchlen(s+4, o2+4, src)
var l int32
{
a := src[s+4:]
b := src[o2+4:]
endI := len(a) & (math.MaxInt32 - 7)
l = int32(endI) + 4
for i := 0; i < endI; i += 8 {
if diff := load64(a, i) ^ load64(b, i); diff != 0 {
l = int32(i+bits.TrailingZeros64(diff)>>3) + 4
break
}
}
}
// Store this, since we have it.
nextHash := hash6(cv, hashLog)
e.table[nextHash] = tableEntry{offset: s + e.cur, val: uint32(cv)}
e.markShardDirty(nextHash)
seq.matchLen = uint32(l) - zstdMinMatch
seq.litLen = 0
// Since litlen is always 0, this is offset 1.
seq.offset = 1
s += l
nextEmit = s
if debugSequences {
println("sequence", seq, "next s:", s)
}
blk.sequences = append(blk.sequences, seq)
// Swap offset 1 and 2.
offset1, offset2 = offset2, offset1
if s >= sLimit {
break encodeLoop
}
// Prepare next loop.
cv = load6432(src, s)
}
}
if int(nextEmit) < len(src) {
blk.literals = append(blk.literals, src[nextEmit:]...)
blk.extraLits = len(src) - int(nextEmit)
}
blk.recentOffsets[0] = uint32(offset1)
blk.recentOffsets[1] = uint32(offset2)
if debug {
println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits)
}
}
// ResetDict will reset and set a dictionary if not nil // ResetDict will reset and set a dictionary if not nil
func (e *fastEncoder) Reset(d *dict, singleBlock bool) { func (e *fastEncoder) Reset(d *dict, singleBlock bool) {
e.resetBase(d, singleBlock)
if d != nil {
panic("fastEncoder: Reset with dict")
}
}
// ResetDict will reset and set a dictionary if not nil
func (e *fastEncoderDict) Reset(d *dict, singleBlock bool) {
e.resetBase(d, singleBlock) e.resetBase(d, singleBlock)
if d == nil { if d == nil {
return return
@ -653,9 +975,44 @@ func (e *fastEncoder) Reset(d *dict, singleBlock bool) {
} }
} }
e.lastDictID = d.id e.lastDictID = d.id
e.allDirty = true
} }
e.cur = e.maxMatchOff e.cur = e.maxMatchOff
// Reset table to initial state dirtyShardCnt := 0
copy(e.table[:], e.dictTable) if !e.allDirty {
for i := range e.tableShardDirty {
if e.tableShardDirty[i] {
dirtyShardCnt++
}
}
}
const shardCnt = tableShardCnt
const shardSize = tableShardSize
if e.allDirty || dirtyShardCnt > shardCnt*4/6 {
copy(e.table[:], e.dictTable)
for i := range e.tableShardDirty {
e.tableShardDirty[i] = false
}
e.allDirty = false
return
}
for i := range e.tableShardDirty {
if !e.tableShardDirty[i] {
continue
}
copy(e.table[i*shardSize:(i+1)*shardSize], e.dictTable[i*shardSize:(i+1)*shardSize])
e.tableShardDirty[i] = false
}
e.allDirty = false
}
func (e *fastEncoderDict) markAllShardsDirty() {
e.allDirty = true
}
func (e *fastEncoderDict) markShardDirty(entryNum uint32) {
e.tableShardDirty[entryNum/tableShardSize] = true
} }

View file

@ -106,7 +106,7 @@ func (e *Encoder) Reset(w io.Writer) {
s.encoder = e.o.encoder() s.encoder = e.o.encoder()
} }
if s.writing == nil { if s.writing == nil {
s.writing = &blockEnc{} s.writing = &blockEnc{lowMem: e.o.lowMem}
s.writing.init() s.writing.init()
} }
s.writing.initNewEncode() s.writing.initNewEncode()
@ -176,6 +176,12 @@ func (e *Encoder) nextBlock(final bool) error {
} }
if !s.headerWritten { if !s.headerWritten {
// If we have a single block encode, do a sync compression. // If we have a single block encode, do a sync compression.
if final && len(s.filling) == 0 && !e.o.fullZero {
s.headerWritten = true
s.fullFrameWritten = true
s.eofWritten = true
return nil
}
if final && len(s.filling) > 0 { if final && len(s.filling) > 0 {
s.current = e.EncodeAll(s.filling, s.current[:0]) s.current = e.EncodeAll(s.filling, s.current[:0])
var n2 int var n2 int
@ -471,7 +477,7 @@ func (e *Encoder) EncodeAll(src, dst []byte) []byte {
} }
// If less than 1MB, allocate a buffer up front. // If less than 1MB, allocate a buffer up front.
if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 { if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem {
dst = make([]byte, 0, len(src)) dst = make([]byte, 0, len(src))
} }
dst, err := fh.appendTo(dst) dst, err := fh.appendTo(dst)

View file

@ -24,12 +24,12 @@ type encoderOptions struct {
allLitEntropy bool allLitEntropy bool
customWindow bool customWindow bool
customALEntropy bool customALEntropy bool
lowMem bool
dict *dict dict *dict
} }
func (o *encoderOptions) setDefault() { func (o *encoderOptions) setDefault() {
*o = encoderOptions{ *o = encoderOptions{
// use less ram: true for now, but may change.
concurrent: runtime.GOMAXPROCS(0), concurrent: runtime.GOMAXPROCS(0),
crc: true, crc: true,
single: nil, single: nil,
@ -37,20 +37,31 @@ func (o *encoderOptions) setDefault() {
windowSize: 8 << 20, windowSize: 8 << 20,
level: SpeedDefault, level: SpeedDefault,
allLitEntropy: true, allLitEntropy: true,
lowMem: false,
} }
} }
// encoder returns an encoder with the selected options. // encoder returns an encoder with the selected options.
func (o encoderOptions) encoder() encoder { func (o encoderOptions) encoder() encoder {
switch o.level { switch o.level {
case SpeedDefault:
return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}}
case SpeedBetterCompression:
return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}
case SpeedBestCompression:
return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}
case SpeedFastest: case SpeedFastest:
return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} if o.dict != nil {
return &fastEncoderDict{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}}
}
return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}
case SpeedDefault:
if o.dict != nil {
return &doubleFastEncoderDict{fastEncoderDict: fastEncoderDict{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}}}
}
return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}}
case SpeedBetterCompression:
if o.dict != nil {
return &betterFastEncoderDict{betterFastEncoder: betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}}
}
return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}
case SpeedBestCompression:
return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize), lowMem: o.lowMem}}
} }
panic("unknown compression level") panic("unknown compression level")
} }
@ -276,6 +287,17 @@ func WithSingleSegment(b bool) EOption {
} }
} }
// WithLowerEncoderMem will trade in some memory cases trade less memory usage for
// slower encoding speed.
// This will not change the window size which is the primary function for reducing
// memory usage. See WithWindowSize.
func WithLowerEncoderMem(b bool) EOption {
return func(o *encoderOptions) error {
o.lowMem = b
return nil
}
}
// WithEncoderDict allows to register a dictionary that will be used for the encode. // WithEncoderDict allows to register a dictionary that will be used for the encode.
// The encoder *may* choose to use no dictionary instead for certain payloads. // The encoder *may* choose to use no dictionary instead for certain payloads.
func WithEncoderDict(dict []byte) EOption { func WithEncoderDict(dict []byte) EOption {

View file

@ -97,7 +97,7 @@ func (s *fseEncoder) prepare() (*fseEncoder, error) {
func (s *fseEncoder) allocCtable() { func (s *fseEncoder) allocCtable() {
tableSize := 1 << s.actualTableLog tableSize := 1 << s.actualTableLog
// get tableSymbol that is big enough. // get tableSymbol that is big enough.
if cap(s.ct.tableSymbol) < int(tableSize) { if cap(s.ct.tableSymbol) < tableSize {
s.ct.tableSymbol = make([]byte, tableSize) s.ct.tableSymbol = make([]byte, tableSize)
} }
s.ct.tableSymbol = s.ct.tableSymbol[:tableSize] s.ct.tableSymbol = s.ct.tableSymbol[:tableSize]
@ -202,13 +202,13 @@ func (s *fseEncoder) buildCTable() error {
case 0: case 0:
case -1, 1: case -1, 1:
symbolTT[i].deltaNbBits = tl symbolTT[i].deltaNbBits = tl
symbolTT[i].deltaFindState = int16(total - 1) symbolTT[i].deltaFindState = total - 1
total++ total++
default: default:
maxBitsOut := uint32(tableLog) - highBit(uint32(v-1)) maxBitsOut := uint32(tableLog) - highBit(uint32(v-1))
minStatePlus := uint32(v) << maxBitsOut minStatePlus := uint32(v) << maxBitsOut
symbolTT[i].deltaNbBits = (maxBitsOut << 16) - minStatePlus symbolTT[i].deltaNbBits = (maxBitsOut << 16) - minStatePlus
symbolTT[i].deltaFindState = int16(total - v) symbolTT[i].deltaFindState = total - v
total += v total += v
} }
} }
@ -353,8 +353,8 @@ func (s *fseEncoder) normalizeCount2(length int) error {
distributed uint32 distributed uint32
total = uint32(length) total = uint32(length)
tableLog = s.actualTableLog tableLog = s.actualTableLog
lowThreshold = uint32(total >> tableLog) lowThreshold = total >> tableLog
lowOne = uint32((total * 3) >> (tableLog + 1)) lowOne = (total * 3) >> (tableLog + 1)
) )
for i, cnt := range s.count[:s.symbolLen] { for i, cnt := range s.count[:s.symbolLen] {
if cnt == 0 { if cnt == 0 {
@ -379,7 +379,7 @@ func (s *fseEncoder) normalizeCount2(length int) error {
if (total / toDistribute) > lowOne { if (total / toDistribute) > lowOne {
// risk of rounding to zero // risk of rounding to zero
lowOne = uint32((total * 3) / (toDistribute * 2)) lowOne = (total * 3) / (toDistribute * 2)
for i, cnt := range s.count[:s.symbolLen] { for i, cnt := range s.count[:s.symbolLen] {
if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) { if (s.norm[i] == notYetAssigned) && (cnt <= lowOne) {
s.norm[i] = 1 s.norm[i] = 1

View file

@ -417,7 +417,7 @@ var crcTable = crc32.MakeTable(crc32.Castagnoli)
// https://github.com/google/snappy/blob/master/framing_format.txt // https://github.com/google/snappy/blob/master/framing_format.txt
func snappyCRC(b []byte) uint32 { func snappyCRC(b []byte) uint32 {
c := crc32.Update(0, crcTable, b) c := crc32.Update(0, crcTable, b)
return uint32(c>>15|c<<17) + 0xa282ead8 return c>>15 | c<<17 + 0xa282ead8
} }
// snappyDecodedLen returns the length of the decoded block and the number of bytes // snappyDecodedLen returns the length of the decoded block and the number of bytes

2
vendor/modules.txt vendored
View file

@ -127,7 +127,7 @@ github.com/jmespath/go-jmespath
github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report
github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/formatter
github.com/jstemmer/go-junit-report/parser github.com/jstemmer/go-junit-report/parser
# github.com/klauspost/compress v1.11.12 # github.com/klauspost/compress v1.11.13
## explicit ## explicit
github.com/klauspost/compress/flate github.com/klauspost/compress/flate
github.com/klauspost/compress/fse github.com/klauspost/compress/fse