mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
app/vmselect: fixes possible panics for multitenant queries
This commit fixes panic for multitenant requests and empty storage node responses for tenants api. It also optimizes `populateSqTenantTokensIfNeeded` function calls, by making it only once for query request. Previously it was incorrectly called multiple times per each storage node request. Related issue: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549 --------- Signed-off-by: f41gh7 <nik@victoriametrics.com> Co-authored-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
parent
c73a9cbdaa
commit
3120dc2054
4 changed files with 35 additions and 50 deletions
|
@ -852,15 +852,12 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||||
deletedCount int
|
deletedCount int
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
sns := getStorageNodes()
|
|
||||||
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
|
||||||
err := populateSqTenantTokensIfNeeded(sq)
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*nodeResult{{
|
return 0, err
|
||||||
err: err,
|
|
||||||
}}
|
|
||||||
}
|
}
|
||||||
|
sns := getStorageNodes()
|
||||||
|
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||||
sn.deleteSeriesRequests.Inc()
|
sn.deleteSeriesRequests.Inc()
|
||||||
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
|
||||||
|
@ -876,7 +873,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
|
||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
deletedTotal := 0
|
deletedTotal := 0
|
||||||
err := snr.collectAllResults(func(result any) error {
|
err = snr.collectAllResults(func(result any) error {
|
||||||
for _, cr := range result.([]any) {
|
for _, cr := range result.([]any) {
|
||||||
nr := cr.(*nodeResult)
|
nr := cr.(*nodeResult)
|
||||||
if nr.err != nil {
|
if nr.err != nil {
|
||||||
|
@ -904,15 +901,12 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||||
labelNames []string
|
labelNames []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
sns := getStorageNodes()
|
|
||||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
|
||||||
err := populateSqTenantTokensIfNeeded(sq)
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*nodeResult{{
|
return nil, false, err
|
||||||
err: err,
|
|
||||||
}}
|
|
||||||
}
|
}
|
||||||
|
sns := getStorageNodes()
|
||||||
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||||
sn.labelNamesRequests.Inc()
|
sn.labelNamesRequests.Inc()
|
||||||
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
|
||||||
|
@ -1051,15 +1045,12 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
|
||||||
labelValues []string
|
labelValues []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
sns := getStorageNodes()
|
|
||||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
|
||||||
err := populateSqTenantTokensIfNeeded(sq)
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*nodeResult{{
|
return nil, false, err
|
||||||
err: err,
|
|
||||||
}}
|
|
||||||
}
|
}
|
||||||
|
sns := getStorageNodes()
|
||||||
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||||
sn.labelValuesRequests.Inc()
|
sn.labelValuesRequests.Inc()
|
||||||
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
|
||||||
|
@ -1267,15 +1258,12 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
|
||||||
status *storage.TSDBStatus
|
status *storage.TSDBStatus
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
sns := getStorageNodes()
|
|
||||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
|
||||||
err := populateSqTenantTokensIfNeeded(sq)
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*nodeResult{{
|
return nil, false, err
|
||||||
err: err,
|
|
||||||
}}
|
|
||||||
}
|
}
|
||||||
|
sns := getStorageNodes()
|
||||||
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
|
||||||
sn.tsdbStatusRequests.Inc()
|
sn.tsdbStatusRequests.Inc()
|
||||||
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
|
||||||
|
@ -1703,14 +1691,12 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
|
||||||
metricNames []string
|
metricNames []string
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
sns := getStorageNodes()
|
|
||||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
|
||||||
err := populateSqTenantTokensIfNeeded(sq)
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return []*nodeResult{{
|
return nil, false, err
|
||||||
err: err,
|
|
||||||
}}
|
|
||||||
}
|
}
|
||||||
|
sns := getStorageNodes()
|
||||||
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
|
||||||
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
|
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
|
||||||
sn.searchMetricNamesRequests.Inc()
|
sn.searchMetricNamesRequests.Inc()
|
||||||
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
|
||||||
|
@ -1896,14 +1882,12 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
err := populateSqTenantTokensIfNeeded(sq)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
// Send the query to all the storage nodes in parallel.
|
// Send the query to all the storage nodes in parallel.
|
||||||
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
|
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
|
||||||
var err error
|
|
||||||
err = populateSqTenantTokensIfNeeded(sq)
|
|
||||||
if err != nil {
|
|
||||||
return &err
|
|
||||||
}
|
|
||||||
|
|
||||||
res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
|
res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
|
||||||
sn.searchRequests.Inc()
|
sn.searchRequests.Inc()
|
||||||
err = sn.processSearchQuery(qt, rd, f, workerID, deadline)
|
err = sn.processSearchQuery(qt, rd, f, workerID, deadline)
|
||||||
|
|
|
@ -1112,9 +1112,9 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
|
||||||
return offset >= maxOffset
|
return offset >= maxOffset
|
||||||
}
|
}
|
||||||
|
|
||||||
at := ec.AuthTokens[0]
|
var at *auth.Token
|
||||||
if ec.IsMultiTenant {
|
if !ec.IsMultiTenant {
|
||||||
at = nil
|
at = ec.AuthTokens[0]
|
||||||
}
|
}
|
||||||
deleteCachedSeries := func(qt *querytracer.Tracer) {
|
deleteCachedSeries := func(qt *querytracer.Tracer) {
|
||||||
rollupResultCacheV.DeleteInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
rollupResultCacheV.DeleteInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
||||||
|
|
|
@ -236,9 +236,9 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig,
|
||||||
// Obtain tss from the cache.
|
// Obtain tss from the cache.
|
||||||
bb := bbPool.Get()
|
bb := bbPool.Get()
|
||||||
defer bbPool.Put(bb)
|
defer bbPool.Put(bb)
|
||||||
at := ec.AuthTokens[0]
|
var at *auth.Token
|
||||||
if ec.IsMultiTenant {
|
if !ec.IsMultiTenant {
|
||||||
at = nil
|
at = ec.AuthTokens[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
||||||
|
@ -369,9 +369,9 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig,
|
||||||
metainfoBuf := bbPool.Get()
|
metainfoBuf := bbPool.Get()
|
||||||
defer bbPool.Put(metainfoBuf)
|
defer bbPool.Put(metainfoBuf)
|
||||||
|
|
||||||
at := ec.AuthTokens[0]
|
var at *auth.Token
|
||||||
if ec.IsMultiTenant {
|
if !ec.IsMultiTenant {
|
||||||
at = nil
|
at = ec.AuthTokens[0]
|
||||||
}
|
}
|
||||||
metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
|
||||||
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)
|
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)
|
||||||
|
|
|
@ -26,6 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
|
||||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details.
|
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details.
|
||||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519).
|
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519).
|
||||||
* BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results.
|
* BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results.
|
||||||
|
BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details.
|
||||||
|
|
||||||
## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0)
|
## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue